In [None]:
import configparser
from datetime import datetime, timedelta
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import DateType,StringType
from pyspark.sql import functions as F
from pyspark.sql.functions import avg
import pandas as pd

In [None]:
test_spark = SparkSession.builder.getOrCreate()

In [None]:
check_df = spark.read.parquet(os.path.join(os.getpwd(), '/*.parquet'))

In [None]:
demographics_df = spark.read.option("multiline","true").json('sample_data/us-cities-demographics.json')

In [None]:
demographics_df.show()

In [None]:
demographics_df.filter(demographics_df.City == "New York").show()

In [None]:
# create demographics dimension table
demographics_table = demographics_df.select(
        monotonically_increasing_id().alias('city_id'),
        col('City').alias('city_name'),
        col('State').alias('state_name'),
        col('Median Age').alias('median_age'),
        col('Male Population').alias('male_population'),
        col('Female Population').alias('female_population'),
        col('Total Population').alias('total_population'),
        col('Number of Veterans').alias('num_veterans'),
        col('Foreign-born').alias('foreign_born'),
        col('Average Household Size').alias('avg_household'),
        col('State Code').alias('state_code'),
    )



In [None]:
demographics_table.show()

In [None]:
temperatures_df = spark.read.csv('sample_data/temperatures_sample.csv',header=True)

In [None]:
temperatures_df.show()

In [None]:
# drop rows with missing average temperature
temperatures_df = temperatures_df.dropna(subset=['AverageTemperature'])
    
# drop duplicate rows
temperatures_df = temperatures_df.drop_duplicates(subset=['dt', 'City', 'Country'])

# filter city in US
temperatures_df = temperatures_df.filter(temperatures_df.Country == 'United States')

temperatures_df.show()

In [None]:
temperatures_df.count()

In [None]:
demographics_table.count()

In [None]:
joined_df = temperatures_df.join(demographics_table, (temperatures_df.City == demographics_table.city_name), how='inner')

In [None]:
joined_df.show()

In [None]:
joined_df.count()

In [None]:
temperatures_df.show()

In [None]:
airports_df = spark.read.csv('sample_data/airport-codes_csv.csv',header=True)

In [None]:
airports_df.show()

In [None]:
airports_df = airports_df.filter(airports_df.continent == 'NA')
airports_df = airports_df.filter(airports_df.iso_country == 'US')

# extract 2-letter state code
extract_state_code = F.udf(lambda x: x[3:], StringType())
airports_df = airports_df.withColumn('state_code', extract_state_code('iso_region'))

# extract columns to create songs table
airports_table = airports_df.select(
        col('ident').alias('airport_code'),
        'state_code',
        'type',
        'name',
        col('municipality').alias('city')
    )

In [None]:
airports_df = airports_df.filter(airports_df.continent == 'NA')
airports_df = airports_df.filter(airports_df.iso_country == 'US')

In [None]:
airports_df.show()

In [None]:
temperatures_table = temperatures_df.select(
        monotonically_increasing_id().alias('temperature_id'),
        col('AverageTemperature').alias('avg_temp'),
        col('City').alias('city'),
    )

In [None]:
temperatures_table.show()

In [None]:
extract_state_code = F.udf(lambda x: x[3:], StringType())
airports_df = airports_df.withColumn('state_code', extract_state_code('iso_region'))

In [None]:
airports_df.show()

In [None]:
joined_2_df = temperatures_table.join(airports_table,(temperatures_table.City==airports_table.),how='inner')