In [1]:
### This program requires a local installation of APACHE SPARK to run
# Writing csv to Postgres/AWS Server

# import libraries
import findspark

# Start Spark Session
findspark.init('C:\Spark\spark-3.2.2-bin-hadoop2.7')
import pyspark
from pyspark import SparkFiles
from pyspark.sql.functions import to_date, to_timestamp
from getpass import getpass


# Build Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Qwest-Analytics-Dashboard-and-ML-Model").config("spark.jars", "C:\Spark\spark-3.2.2-bin-hadoop2.7\jars\postgresql-42.5.0.jar").getOrCreate()

In [2]:
from pyspark.sql import SQLContext
sc=spark.sparkContext
sqlContext = SQLContext(sc)

spark.sparkContext._conf.getAll()



[('spark.app.name', 'Qwest-Analytics-Dashboard-and-ML-Model'),
 ('spark.app.initial.jar.urls',
  'spark://10.0.0.190:49213/jars/postgresql-42.5.0.jar'),
 ('spark.driver.host', '10.0.0.190'),
 ('spark.repl.local.jars',
  'file:///C:/Spark/spark-3.2.2-bin-hadoop2.7/jars/postgresql-42.5.0.jar'),
 ('spark.executor.id', 'driver'),
 ('spark.jars',
  'C:\\Spark\\spark-3.2.2-bin-hadoop2.7\\jars\\postgresql-42.5.0.jar'),
 ('spark.driver.memory', '16g'),
 ('spark.app.startTime', '1668733532559'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.sql.warehouse.dir',
  'file:/F:/Data%20Analytics/Final%20Project/Test%20Code/spark-warehouse'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1668733533607'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '49213')]

In [3]:
#Import table into Spark Instance
resources_path = 'F:/Data Analytics/Final Project/Resources/'

cleaned_minute_aggregation_df = spark.read.csv(SparkFiles.get(f'{resources_path}cleaned_merged_minute_aggregate_data-2022-11-17.csv'), sep=',', header=True)
# Drop index column
cleaned_minute_aggregation_df = cleaned_minute_aggregation_df.drop('_c0')
cleaned_minute_aggregation_df.show()

+-------------------+-----------+--------------+------------------------------+--------+-----------+------------------+--------------------+------+
|               time|    country|total_sessions|total_session_duration_seconds|operator|    channel|cleaned_content_id|               genre|region|
+-------------------+-----------+--------------+------------------------------+--------+-----------+------------------+--------------------+------+
|2021-02-16 00:57:00|    Finland|           1.0|                          20.0|  Op_001|Channel_002|          PRO_1301|   OPERA , CLASSICAL|Europe|
|2021-02-16 01:22:00|    Morocco|           1.0|                          13.0|  Op_001|Channel_002|          PRO_1326|ORCHESTRA, CLASSI...|Africa|
|2021-02-16 05:03:00|    Germany|           1.0|                           2.0|  Op_001|Channel_002|          PRO_1355|ROMANTIC, CHAMBER...|Europe|
|2021-02-16 14:03:00|      Italy|           1.0|                          29.0|  Op_001|Channel_002|          PR

In [4]:
cleaned_minute_aggregation_df.printSchema()

root
 |-- time: string (nullable = true)
 |-- country: string (nullable = true)
 |-- total_sessions: string (nullable = true)
 |-- total_session_duration_seconds: string (nullable = true)
 |-- operator: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- cleaned_content_id: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- region: string (nullable = true)



In [5]:
# convert time column to datetime and total_sessions and total_sessions_duration_seconds to float
cleaned_minute_aggregation_df = cleaned_minute_aggregation_df.withColumn('time', to_timestamp('time'))
cleaned_minute_aggregation_df = cleaned_minute_aggregation_df.withColumn('total_sessions', cleaned_minute_aggregation_df.total_sessions.cast('double'))
cleaned_minute_aggregation_df = cleaned_minute_aggregation_df.withColumn('total_session_duration_seconds', cleaned_minute_aggregation_df.total_session_duration_seconds.cast('double'))

In [6]:
cleaned_minute_aggregation_df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- country: string (nullable = true)
 |-- total_sessions: double (nullable = true)
 |-- total_session_duration_seconds: double (nullable = true)
 |-- operator: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- cleaned_content_id: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- region: string (nullable = true)



In [7]:
# Connecting to database and exporting data

#Store environmental variables
password = getpass('Enter DataBase Password: ')

#Configure for RDS
mode = 'append'
jdbc_url="jdbc:postgresql://qwest-final-project.ccngkdwtiuvz.us-east-2.rds.amazonaws.com:5432/Qwest-Database"
config = {"user":"postgres", 
          "password": password, 
          "driver":"org.postgresql.Driver"}

WRITE TABLES TO DATABASE

In [8]:
#Import table into Spark Instance
clean_viewership_df = spark.read.csv(SparkFiles.get(f'{resources_path}Cleaned_Viewership_Data_v4.csv'), sep=',', header=True)
clean_viewership_df.write.jdbc(url=jdbc_url, table = 'cleaned_viewership_data', mode=mode, properties=config)

In [9]:
#Import table into Spark Instance
clean_advertising_df = spark.read.csv(SparkFiles.get(f'{resources_path}Clean_Advertising_Data_v4.csv'), sep=',', header=True)
clean_advertising_df.write.jdbc(url=jdbc_url, table = 'cleaned_advertising_data', mode=mode, properties=config)

In [10]:
# Write Cleaned_Merged_Minute_Aggregation to table in RDS
cleaned_minute_aggregation_df.write.jdbc(url=jdbc_url, table = 'cleaned_merged_minute_aggregation', mode=mode, properties=config)

CHECK IF TABLES ARE READABLE

In [None]:
# check if you can import
minute_aggregation_test_df = spark.read.format("jdbc").options(
         url='jdbc:postgresql://qwest-final-project.ccngkdwtiuvz.us-east-2.rds.amazonaws.com:5432/Qwest-Database',
         dbtable='cleaned_merged_minute_aggregation',
         user='postgres',
         password=password,
         driver='org.postgresql.Driver').load()

In [None]:
minute_aggregation_test_df.show()

+-------------------+-------------+--------------+------------------------------+-----------+--------+------------------+-------------------+
|               time|      country|total_sessions|total_session_duration_seconds|    channel|operator|cleaned_content_id|              genre|
+-------------------+-------------+--------------+------------------------------+-----------+--------+------------------+-------------------+
|2022-05-10 19:57:00|United States|          79.0|                        4276.0|Channel_002|  Op_009|          PRO_1759|ROMANTIC, ORCHESTRA|
|2022-05-10 19:57:00|  Puerto Rico|           3.0|                          30.0|Channel_002|  Op_009|          PRO_1759|ROMANTIC, ORCHESTRA|
|2022-05-10 19:57:00|       Canada|           1.0|                          60.0|Channel_002|  Op_009|          PRO_1759|ROMANTIC, ORCHESTRA|
|2022-05-10 19:58:00|United States|          89.0|                        4337.0|Channel_002|  Op_009|          PRO_1759|ROMANTIC, ORCHESTRA|
|2022-