Cluster :
3 Node i3.xlarge cluster with 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

In [0]:
spark = SparkSession.builder \
            .appName('gp17') \
            .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.0")\
            .config("spark.network.timeout", "7200s")\
            .config("spark.executor.heartbeatInterval", "1200s")\
            .getOrCreate()

# Add configuration for accessing S3

In [0]:
aws_access_key = ''
aws_secret_key = ''
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') 
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)

# Data Pre-processing:
### For simplicity, read files from S3 and join them to create an aggregate for this example.

1. We loaded the data from s3 and grabbed the columns we needed for covid dataset and grabbed everything from our ecommerce dataset.
2. For covid dataset we made sure to safely convert the column types to the desired type.
3. After we joined both of the dataframes using column date. So we could track both covid cases and the amount of purchases for ecmo.

In [0]:
def IntegerSafe(value): # In case there are non-integer type to be converted.
    try:
        return int(value)
    except ValueError:
        return None
    
def FloatSafe(value): # In case there are non-integer type to be converted.
    try:
        return float(value)
    except ValueError:
        return None

def toTimeSafe(inval):
    try:
        return datetime.strptime(inval, "%Y%m%d")
    except ValueError:
        return None


ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

covid = sc.textFile("s3://msds697ecommerce/us_covid19_daily.csv")\
              .map(lambda x : x.split(','))
#              
#preprocessing - remove header
header = covid.first()
covid = covid.filter(lambda x: x != header)

#make data safe, sort by date
covid = covid.map(lambda x : (toTimeSafe(x[0]), IntegerSafe(x[1]), IntegerSafe(x[2]), FloatSafe(x[3]))).sortBy(lambda x: x[0])

#time efficiency: 20.14seconds          


In [0]:
#establish scheme for spark sql dataframe
schema = StructType([StructField("date", DateType(), False),
    StructField("positive", IntegerType(), True),
    StructField("negative", IntegerType(), True),
    StructField("hospitalizedCurrently",FloatType(), True)])

In [0]:
covid_df = ss.createDataFrame(covid, schema)

In [0]:
covid_df.show(50)

In [0]:
# s3://msds697ecommerce/2020-Mar.csv
# Schema for main dataset, ecommerce for march 2020
schema = StructType([StructField("event_time", DateType(), True),\
                     StructField("event_type", StringType(), True),
                     StructField("product_id", IntegerType(), True),
                     StructField("category_id", StringType(), True),
                     StructField("category_code", StringType(), True),
                     StructField("brand", StringType(), True),
                     StructField("price", FloatType(), True),
                     StructField("user_id", IntegerType(), True),
                     StructField("user_session", StringType(), True)])

ecom = spark.read.schema(schema).csv("s3://msds697ecommerce/2020-Mar.csv", sep=',', header=True)
ecom.show()

In [0]:
#left join, data fusion
joined_df = ecom.join(covid_df, ecom.event_time == covid_df.date, 'left')

In [0]:
joined_df.show(5)
#2.12 minutes

# Connect to MongoDB
## Store aggregates in the database and re-read for machine learning later

In [0]:
database = 'msds697'
collection = 'msds697collection'
user_name = 'diane'
password = 'zKibl3b90oiAaWc4'
address = 'cluster0.3itij.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"

In [0]:
connection_string

In [0]:
joined_df.write.format("mongo").option("uri",connection_string).mode("append").save()
#20.75 minutes -- ~7gb data file size

In [0]:
df = spark.read.format("mongo").option("uri",connection_string).load()
#10.86seconds read

In [0]:
df.show()
#10.76second show for large 7gb dataset (ecommerce)