In [1]:
import os
import pyspark

# Start Spark session
conf = pyspark.SparkConf()
conf.setMaster("local").setAppName("My app")

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

In [2]:
from pyspark.sql.functions import col, struct, collect_list, udf, first

In [3]:
metrics = spark \
                .read \
                .option("inferSchema", "true") \
                .option("header", "true") \
                .csv("data/RAML_tsmetrics.csv")

ts = spark \
                .read.format("csv") \
                .option("inferSchema", "true") \
                .option("header", "true") \
                .load(["data/RAML_timeseries1.csv", "data/RAML_timeseries2.csv"])

stocks = spark \
                .read \
                .option("inferSchema", "true") \
                .option("header", "true") \
                .csv("data/RAML_stock.csv")

In [4]:
# Join all dataframes together and only keep relevant columns
df = stocks \
    .join(ts, stocks["stockid"] == ts["stockid"], 'inner') \
    .select(stocks["stockid"].alias("Stock ID"), \
            stocks["commonname"].alias("Common Name"), \
            stocks["primary_country"].alias("Country"), \
            stocks["ISO3_code"].alias("Country Code"), \
            ts["tsyear"].alias("Year"), \
            ts["tsid"].alias("Measure ID"), \
            ts["tsvalue"].alias("Value")) \
    .where(col("Value").isNotNull())

df = df \
    .join(metrics, metrics["tsunique"] == df["Measure ID"], 'inner') \
    .select(df["Stock ID"], \
            df["Common Name"], \
            df["Country"], \
            df["Country Code"], \
            df["Year"], \
            df["Measure ID"], \
            metrics["tslong"].alias("Measure description"), \
            metrics["tsunitslong"].alias("Unit"), \
            df["Value"])

In [5]:
df.printSchema()

root
 |-- Stock ID: string (nullable = true)
 |-- Common Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Measure ID: string (nullable = true)
 |-- Measure description: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Value: double (nullable = true)



In [6]:
# Create nested tables for Years and Measures
df = df.groupBy("Stock ID", "Common Name", "Country", "Country Code", "Year") \
           .agg(collect_list(struct("Measure ID", "Measure description", "Unit", "Value")).alias("Measures"))
         

df = df.groupBy("Stock ID", "Common Name", "Country", "Country Code") \
    .agg(first(struct("Year", "Measures")).alias("Years"))

In [7]:
df.printSchema()

root
 |-- Stock ID: string (nullable = true)
 |-- Common Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Country Code: string (nullable = true)
 |-- Years: struct (nullable = true)
 |    |-- Year: integer (nullable = true)
 |    |-- Measures: array (nullable = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- Measure ID: string (nullable = true)
 |    |    |    |-- Measure description: string (nullable = true)
 |    |    |    |-- Unit: string (nullable = true)
 |    |    |    |-- Value: double (nullable = true)



In [43]:
# Load the transformed data into MongoDB
import json
import pymongo
from dotenv import dotenv_values

from pyspark.sql.functions import to_json, from_json
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType

In [44]:
# Retrieve MongoDB creds
config = dotenv_values("creds.env")

user = config['user']
password = config['password']

# Connect to MongoDB
uri = "mongodb+srv://" + user + ":" + password + "@cluster0.6jfc5iw.mongodb.net/"
client = pymongo.MongoClient(uri)
db = client["gfw"]
collection = db["stocks"]

In [46]:
stocks_dict = df.toPandas().to_dict("records")

In [48]:
collection.insert_many(stocks_dict)

<pymongo.results.InsertManyResult at 0x17656be7fc0>

In [49]:
client.close()