# Data Storage

## Install libraries & packages

In [None]:
## Install libraries & install packages needed to run MongoDB
!pip3 install pymongo
import pymongo
import requests
import time
import matplotlib.pyplot as plt
import os
from IPython.display import clear_output
import pandas as pd
import seaborn as sns
os.environ['PATH'] += os.pathsep + '/usr/local/bin' 

## Start Docker container

In [None]:
!docker-compose up -d

### Check if the services are running

In [None]:
!docker ps

## Connect to DB

# Process CSV Data

### Install necessary packages

In [None]:
!pip install pymongo

In [None]:
import pandas as pd
from pymongo import MongoClient

# Read the CSV file
poi_df = pd.read_csv('data/top-locations-wien.csv')

# Process the data (e.g., clean, transform)
# Here, we will just print the first few rows as an example
print(poi_df.head())

# Connect to MongoDB
client = MongoClient('localhost', 37017)
db = client['citybike_vienna']
collection = db['top-locations-wien']

# Insert data into MongoDB
collection.insert_many(poi_df.to_dict('records'))

print("Data inserted into MongoDB")

# Set up Spark

In [None]:
import os
from pyspark.sql import SparkSession

# Create Spark session & context
spark = (SparkSession
         .builder
         .appName('nextbike-data-consumer')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
         .getOrCreate())

sc = spark.sparkContext

print("Spark session created successfully.")

## Transform and Analyze Data

In [None]:
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType, LongType

# Define schema for the data
place_schema = StructType([
    StructField("uid", LongType(), True),
    StructField("name", StringType(), True),
    StructField("bike", IntegerType(), True),
    StructField("bike_numbers", ArrayType(StringType()), True),
    StructField("free_racks", IntegerType(), True),
    StructField("bike_racks", IntegerType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True)
])

city_schema = StructType([
    StructField("name", StringType(), True),
    StructField("places", ArrayType(place_schema), True)
])

country_schema = StructType([
    StructField("name", StringType(), True),
    StructField("cities", ArrayType(city_schema), True)
])

data_schema = StructType([
    StructField("countries", ArrayType(country_schema), True)
])

# Create stream dataframe setting Kafka server, topic, and offset option
df = (spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:29093")  # Kafka server
      .option("subscribe", "nextbike_data")  # Topic
      .option("startingOffsets", "earliest")  # Start from beginning
      .load())

# Convert binary to string key and value
df1 = (df
       .withColumn("key", df["key"].cast(StringType()))
       .withColumn("value", df["value"].cast(StringType())))

# Parse JSON data
df2 = df1.select(from_json(col("value"), data_schema).alias("data"))

# Explode the nested structure to flatten the DataFrame
df_countries = df2.select(explode(col("data.countries")).alias("country"))
df_cities = df_countries.select(col("country.name").alias("country_name"), explode(col("country.cities")).alias("city"))
df_places = df_cities.select(
    col("country_name"),
    col("city.name").alias("city_name"),
    explode(col("city.places")).alias("place")
)

# Select and rename required fields
df_formatted = df_places.select(
    col("country_name").alias("country"),
    col("city_name").alias("city"),
    col("place.uid").alias("place_uid"),
    col("place.name").alias("place_name"),
    col("place.bike").alias("bike"),
    col("place.bike_numbers").alias("bike_numbers"),
    col("place.free_racks").alias("free_racks"),
    col("place.bike_racks").alias("total_racks"),
    col("place.lat").alias("latitude"),
    col("place.lng").alias("longitude")
)

# Show the schema of the dataframe
df_formatted.printSchema()

# API

## Visualize the Streaming Data

In [None]:
# Function to query the API and store data
def query_api_and_store_data():
    url = "https://api.nextbike.net/maps/nextbike-live.json"

    try:
        response = requests.get(url)
        data = response.json()
        
        # Extract relevant data
        countries = data['countries']
        records = []
        for country in countries:
            for city in country['cities']:
                for place in city['places']:
                    records.append({
                        'country': country['name'],
                        'city': city['name'],
                        'place_uid': place['uid'],
                        'place_name': place['name'],
                        'bike': place['bike'],
                        'bike_numbers': place.get('bike_numbers', []),
                        'free_racks': place.get('free_racks', None),
                        'total_racks': place.get('bike_racks', None),
                        'latitude': place['lat'],
                        'longitude': place['lng'],
                    })
        
        # Create DataFrame
        df = pd.DataFrame(records)
        
        # Display the head of the DataFrame
        print(df.head())
        
    except Exception as e:
        print("Error:", e)
query_api_and_store_data()