In [1]:
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://username:secret@db:5432/database')
conn = psycopg2.connect(database="database",user="username", password="secret",host="db", port="5432")
cur = conn.cursor()

spark = SparkSession \
    .builder \
    .appName("Postgress") \
    .config("spark.jars", "postgresql-42.2.18.jre7.jar") \
    .getOrCreate()

# Introduction

My aim in this section is to analyze and try to predict the amount of covid cases in each US state on the next day. I will try to day basing on tables which I managed to prepare. I am goin to to take most important factors from each table on basing on that build a predicting model.

In [82]:
target = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db:5432/database")\
    .option("dbtable", "daily") \
    .option("user", "username") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [83]:
education = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db:5432/database")\
    .option("dbtable", "education") \
    .option("user", "username") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [84]:
poverty = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db:5432/database")\
    .option("dbtable", "poverty") \
    .option("user", "username") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [85]:

population = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db:5432/database")\
    .option("dbtable", "population") \
    .option("user", "username") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .load()

# Aggregation

First of all we need to aggregate all data per state in order to be able to join all tables.

In [86]:
education = education.withColumn('FIPS',education['FIPS'].substr(1,2))
education = education.groupBy('FIPS').agg(F.mean('HS_plus'), F.mean('Bach_plus'),F.mean('Adanced_Degree'),F.sum('25Plus_Pop'))

In [87]:
poverty = poverty.withColumn('FIPS',poverty['FIPS'].substr(1,2))
poverty = poverty.groupBy('FIPS').agg(F.sum('Child_Pop'),F.sum('Elder_pop'),
                            F.mean('Pov_Rate'),F.mean('Pov_Rate_Child'),F.mean('Pov_Rate_Elder')).drop('Total_Pop')

In [88]:
population = population.groupBy('State_Code').agg(F.sum('TOT_POP'),F.sum('TOT_MALE'),F.sum('TOT_FEMALE'),
                                     F.sum('WA_MALE'),F.sum('WA_FEMALE'),F.sum('BA_MALE'),
                                     F.sum('BA_FEMALE'),F.sum('IA_MALE'),F.sum('IA_FEMALE'),F.sum('AA_MALE'),
                                     F.sum('AA_FEMALE'),F.sum('H_MALE'),F.sum('H_FEMALE')).drop('CTYNAME','FIPS')
population = population.withColumnRenamed('State_Code','FIPS')

In [89]:
facts = education.join(poverty,'FIPS','inner').join(population,'FIPS','inner')

In [90]:
def state(x):
    x = str(x)
    if len(x) ==1:
        x = '0'+ x
    return x
state_convert = F.udf(lambda x: state(x)) 
target = target.withColumn("FIPS", state_convert(F.col("FIPS")))

In [91]:
df = facts.join(target,'FIPS','inner')

In [92]:
target = df.filter(df.date==20210301)
df = df.filter(df.date!=20210301)
df = df.drop('death')
cols = ['FIPS','state','positiveIncrease']
target = target[cols]

In [95]:
target.write \
    .format("jdbc") \
    .mode('overwrite')\
    .option("url", "jdbc:postgresql://db:5432/database")\
    .option("dbtable", "target") \
    .option("user", "username") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [96]:
df.write \
    .format("jdbc") \
    .mode('overwrite')\
    .option("url", "jdbc:postgresql://db:5432/database")\
    .option("dbtable", "model_df") \
    .option("user", "username") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .save()

# Modeling part

In this part I will build a model which will use all available data to predict amount of covid cases next day.

In [99]:
df = pd.read_sql_query('select * from model_df',con=engine)
df.isnull().sum()

FIPS                   0
avg(HS_plus)           0
avg(Bach_plus)         0
avg(Adanced_Degree)    0
sum(25Plus_Pop)        0
sum(Child_Pop)         0
sum(Elder_pop)         0
avg(Pov_Rate)          0
avg(Pov_Rate_Child)    0
avg(Pov_Rate_Elder)    0
sum(TOT_POP)           0
sum(TOT_MALE)          0
sum(TOT_FEMALE)        0
sum(WA_MALE)           0
sum(WA_FEMALE)         0
sum(BA_MALE)           0
sum(BA_FEMALE)         0
sum(IA_MALE)           0
sum(IA_FEMALE)         0
sum(AA_MALE)           0
sum(AA_FEMALE)         0
sum(H_MALE)            0
sum(H_FEMALE)          0
date                   0
state                  0
positiveIncrease       0
dtype: int64