# Data Consolidation



`load_data()`

#### Pyspark Imports


In [43]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp,format_string,lag
import pyspark.sql.functions as f

#### Define Spark Context


In [44]:
sc = pyspark.SparkContext(appName="Data_consolidation")
sqlContext = SQLContext(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Data_consolidation, master=local[*]) created by __init__ at <ipython-input-3-700fce72828a>:1 

#### Function to load data

<span style="color:red">*Import Covid, Crime and SMB data*</span>

In [45]:
def load_data():
    bos = sqlContext.read.option("header",True).csv("boston_covid.csv")
    phoen= sqlContext.read.option("header",True).csv("Phoenix_covid.csv")
    denv= sqlContext.read.option("header",True).csv("denver_covid.csv")
    balti= sqlContext.read.option("header",True).csv("baltimore.csv")
    crime = sqlContext.read.option("header",True).csv("combined_crime_data.csv")
    city= sqlContext.read.option("header",True).csv("City_Level_data.csv")
    #county= sqlContext.read.option("header",True).csv("County_Level_data.csv")
    lookup= sqlContext.read.option("header",True).csv("uszips.csv")
    #all=sqlContext.read.option("header",True).csv("allzip_covid.csv")
    
    
    
    return bos,phoen,denv,balti,crime,city,lookup

In [46]:
boston,phoenix,denver,baltimore,crime,city,lookup= load_data()

boston.printSchema()
phoenix.printSchema()
denver.printSchema()
baltimore.printSchema()
lookup.printSchema()
city.printSchema()
crime.printSchema()





root
 |-- date: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- city: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- city: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- city: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- county_fips: string (nullable = true)
 |-- city: string (n

#### Function to convert Covid cases to daily numbers

<span style="color:red">*Use window ,lag functions*</span>

In [47]:
from pyspark.sql.window import Window
def daily_covid(df):
    df = df.withColumn('c', to_timestamp(col('date')))
    w = Window.partitionBy('zip').orderBy('c')
    df=df.withColumn('prev_cases',lag('cases').over(w))
    df=df.withColumn('daily_cases',col('cases')-col('prev_cases'))
    df=df.withColumn('prev_deaths',lag('deaths').over(w))
    df=df.withColumn('daily_deaths',col('deaths')-col('prev_deaths'))
    df=df.drop('c')
    return df

boston=daily_covid(boston)
phoenix=daily_covid(phoenix)
denver=daily_covid(denver)
baltimore=daily_covid(baltimore)



In [48]:
baltimore.show(3)
denver.show(3)
phoenix.show(3)
boston.show(3)

+----------+----------+-----+-----------+---------+-----+------+----------+-----------+-----------+------------+
|      date|state_name|  zip|county_fips|     city|cases|deaths|prev_cases|daily_cases|prev_deaths|daily_deaths|
+----------+----------+-----+-----------+---------+-----+------+----------+-----------+-----------+------------+
|2020-03-11|  Maryland|21250|      24005|Baltimore|    0|     0|      null|       null|       null|        null|
|2020-03-12|  Maryland|21250|      24005|Baltimore|    0|     0|         0|        0.0|          0|         0.0|
|2020-03-13|  Maryland|21250|      24005|Baltimore|    0|     0|         0|        0.0|          0|         0.0|
+----------+----------+-----+-----------+---------+-----+------+----------+-----------+-----------+------------+
only showing top 3 rows

+----------+----------+-----+-----------+------+-----+------+----------+-----------+-----------+------------+
|      date|state_name|  zip|county_fips|  city|cases|deaths|prev_cases|da

### Formate month, day and add date column in City File

In [49]:
city=city.withColumn('month',format_string("%02d", col('month').cast('Integer')))
city=city.withColumn('day',format_string("%02d", col('day').cast('Integer')))

city=city.withColumn('date',f.concat(f.col('year'),f.lit('-'), f.col('month'),f.lit('-'),f.col('day')))
city.show(3)

+----+-----+---+------+---------+---------+-------------+--------------+--------------+--------------+--------------+------------+------------+------------+------------+------------+----+---------+---------+---------+---------+---------+---------+---------+----------------------+-----------------------+----------+
|year|month|day|cityid|     City|   County|merchants_all|merchants_ss40|merchants_ss60|merchants_ss65|merchants_ss70| revenue_all|revenue_ss40|revenue_ss60|revenue_ss65|revenue_ss70|_c16|spend_acf|spend_aer|spend_all|spend_apg|spend_grf|spend_hcs|spend_tws|spend_retail_w_grocery|spend_retail_no_grocery|      date|
+----+-----+---+------+---------+---------+-------------+--------------+--------------+--------------+--------------+------------+------------+------------+------------+------------+----+---------+---------+---------+---------+---------+---------+---------+----------------------+-----------------------+----------+
|2020|   01| 10|     5|  Phoenix| Maricopa|      0.0

### Merging Covid, Crime and City Files.

In [50]:

lu=lookup.withColumn("County",col("county_name")).drop('county_name')
lu=lu.select('County','zip')

b_county=boston.join(lu,'zip','inner')
b_zip=b_county
b_county=b_county.withColumn('cases',col('cases').cast('Integer')).withColumn('deaths',col('deaths').cast('Integer'))
b_county=b_county.groupBy(['date','County']).agg(f.sum('cases').alias('cases'),f.sum('deaths').alias('deaths')).\
withColumn('city',f.lit('Boston'))

d_county=denver.join(lu,'zip','inner')

d_zip=d_county
d_county=d_county.withColumn('cases',col('cases').cast('Integer')).withColumn('deaths',col('deaths').cast('Integer'))
d_county=d_county.groupBy(['date','County']).agg(f.sum('cases').alias('cases'),f.sum('deaths').alias('deaths')).\
withColumn('city',f.lit('Denver'))

p_county=phoenix.join(lu,'zip','inner')

p_zip=p_county
p_county=p_county.withColumn('cases',col('cases').cast('Integer')).withColumn('deaths',col('deaths').cast('Integer'))
p_county=p_county.groupBy(['date','County']).agg(f.sum('cases').alias('cases'),f.sum('deaths').alias('deaths')).\
withColumn('city',f.lit('Phoenix'))

m_county=baltimore.join(lu,'zip','inner')
m_zip=m_county
m_county=m_county.withColumn('cases',col('cases').cast('Integer')).withColumn('deaths',col('deaths').cast('Integer'))
m_county=m_county.groupBy(['date','County']).agg(f.sum('cases').alias('cases'),f.sum('deaths').alias('deaths')).\
withColumn('city',f.lit('Baltimore'))

crime=crime.withColumn('zip', format_string("%05d", col('zip').cast('Integer')))
Crime=crime.fillna('0').withColumn('zip', format_string("%05d", col('zip').cast('Integer')))
Crime=Crime.join(lu,'zip','inner')
Crime_zip=Crime.withColumn('date',col('crimedate')).drop('crimedate').withColumn('city_crime',col('city')).drop('city')
Crime=Crime.groupBy(['crimedate','County','city']).agg(f.sum('Non-Violent').alias('Non-Violent'),f.sum('Violent').alias('Violent'))\
.withColumn('date',col('crimedate')).drop('crimedate').withColumn('city_crime',col('city')).drop('city')
covid=m_county.union(b_county).union(p_county).union(d_county)
covid=covid.withColumn('city_covid',col('city')).drop('city')
crime_covid=Crime.join(covid,['date','County'],'outer')
covid_zip=b_zip.union(d_zip).union(p_zip).union(m_zip)
covid_zip=covid_zip.withColumn('city_covid',col('city')).drop('city')

Combined_covid_crime_zip=Crime_zip.join(covid_zip,['date','zip','County'],'outer')




###  Converting County level data to Zip level

In [51]:
all=lookup.groupBy('state_name','county_name').agg(f.sum('population').alias('pops'))
all_zip=lookup.join(all,['state_name','county_name'],'inner')

all_zip=all_zip.withColumn('prop',col('population')/col('pops')).select('zip','county_name','prop').withColumn('County',col('county_name')).drop('county_name')

city_zip=city.join(all_zip,'County','inner')
combined_city_zip_inner=city_zip.join(Combined_covid_crime_zip,['date','zip','County'],'inner')
combined_city_zip_inner.show(3)


+----------+-----+--------+----+-----+---+------+-------+-------------+--------------+--------------+--------------+--------------+-----------+------------+------------+------------+------------+----+---------+---------+---------+---------+---------+---------+---------+----------------------+-----------------------+--------------------+------------------+-----+--------+--------------+------------+-----+----------+--------+-------+-------------------+----+-------+--------+--------------------+-----------+---------+-----------+-------+----------+----------+-----------+-----+------+----------+-----------+-----------+------------+----------+
|      date|  zip|  County|year|month|day|cityid|   City|merchants_all|merchants_ss40|merchants_ss60|merchants_ss65|merchants_ss70|revenue_all|revenue_ss40|revenue_ss60|revenue_ss65|revenue_ss70|_c16|spend_acf|spend_aer|spend_all|spend_apg|spend_grf|spend_hcs|spend_tws|spend_retail_w_grocery|spend_retail_no_grocery|                prop|Aggravated Assau

### Saving Combined City SMB, Crime, Covid data at Zip level

In [53]:
combined_city_zip_inner.repartition(1).write.csv("city_zip_inner_final", header= 'true')
