# Initiating Spark session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("merge_datasets").getOrCreate()

# Merging and pre-processing of our first dataset (df1, df1a and df1b): **"df1_merged"**

In [2]:
# Reading all the .csv files from our first dataset (df1 and related)
df1 = spark.read.csv("data1_US_Chronic_Respiratory_Disease_Mortality_Rates/data1/respiratory_disease_mortality_rate_usa.csv", header = True, inferSchema = True)
df1a = spark.read.csv("data1_US_Chronic_Respiratory_Disease_Mortality_Rates/data1/reference_location.csv", header = True, inferSchema = True)
df1b = spark.read.csv("data1_US_Chronic_Respiratory_Disease_Mortality_Rates/data1/reference_disease_name.csv", header = True, inferSchema = True)

In [3]:
df1.show() # respiratory_disease_mortality_rate_usa
print(f"df1 has {df1.count()} rows")

+-----------+----+--------+------+-------+----------------+----------------+----------------+
|location_id|fips|cause_id|sex_id|year_id|            rate|           lower|           upper|
+-----------+----+--------+------+-------+----------------+----------------+----------------+
|        523|   1|     508|     1|   1980|78.8240143049003|76.0416535564831|81.2860330254077|
|        523|   1|     508|     1|   1981|79.0443658945709|76.5087647940048|81.5276692567423|
|        523|   1|     508|     1|   1982|78.8112586372187|76.2767652989869|81.1914708668052|
|        523|   1|     508|     1|   1983|79.0766571203029|76.6822478099066|81.3736660066615|
|        523|   1|     508|     1|   1984|80.5325242474845|78.1203554139586|82.8761094707397|
|        523|   1|     508|     1|   1985|83.5481777042778|81.1573443449007|  85.70592128092|
|        523|   1|     508|     1|   1986|83.5863221866442|81.3655694934226|85.8497713841107|
|        523|   1|     508|     1|   1987|83.2647721314435|8

In [4]:
from pyspark.sql.functions import col

# The columns 'rate', 'lower' and 'upper' are defined as having 'double' type values, the creators of the dataframe did this in order to obtain values more previse than float values.
# We're converting these values to float so it later matches with float values that may appear down the line

# Convert column types from double to float
df1 = df1.withColumn("rate", col("rate").cast("float")) \
         .withColumn("lower", col("lower").cast("float")) \
         .withColumn("upper", col("upper").cast("float"))

df1.show()

+-----------+----+--------+------+-------+---------+---------+---------+
|location_id|fips|cause_id|sex_id|year_id|     rate|    lower|    upper|
+-----------+----+--------+------+-------+---------+---------+---------+
|        523|   1|     508|     1|   1980| 78.82401| 76.04166| 81.28603|
|        523|   1|     508|     1|   1981|79.044365|76.508766| 81.52767|
|        523|   1|     508|     1|   1982| 78.81126|76.276764| 81.19147|
|        523|   1|     508|     1|   1983| 79.07666| 76.68225|81.373665|
|        523|   1|     508|     1|   1984|80.532524| 78.12035| 82.87611|
|        523|   1|     508|     1|   1985| 83.54818| 81.15734|85.705925|
|        523|   1|     508|     1|   1986| 83.58632| 81.36557| 85.84977|
|        523|   1|     508|     1|   1987| 83.26477| 80.91308| 85.63398|
|        523|   1|     508|     1|   1988|84.932144| 82.64689|87.310555|
|        523|   1|     508|     1|   1989| 85.05759| 82.87069|87.518906|
|        523|   1|     508|     1|   1990| 84.55788

In [5]:
df1a.show() # reference_location
print(f"df1a has {df1a.count()} rows")

+-----------+----+--------------------+
|location_id|fips|       location_name|
+-----------+----+--------------------+
|        102|  NA|       United States|
|        523|   1|             Alabama|
|        524|   2|              Alaska|
|        525|   4|             Arizona|
|        526|   5|            Arkansas|
|        527|   6|          California|
|        528|   8|            Colorado|
|        529|   9|         Connecticut|
|        530|  10|            Delaware|
|        531|  11|District of Columbia|
|        532|  12|             Florida|
|        533|  13|             Georgia|
|        534|  15|              Hawaii|
|        535|  16|               Idaho|
|        536|  17|            Illinois|
|        537|  18|             Indiana|
|        538|  19|                Iowa|
|        539|  20|              Kansas|
|        540|  21|            Kentucky|
|        541|  22|           Louisiana|
+-----------+----+--------------------+
only showing top 20 rows

df1a has 3194 

In [6]:
df1b.show() # reference_disease_name
print(f"df1b has {df1b.count()} rows")

+--------+--------------------+
|cause_id|          cause_name|
+--------+--------------------+
|     508|Chronic respirato...|
|     509|Chronic obstructi...|
|     510|      Pneumoconiosis|
|     511|           Silicosis|
|     512|          Asbestosis|
|     513|Coal workers pneu...|
|     514|Other pneumoconiosis|
|     515|              Asthma|
|     516|Interstitial lung...|
|     520|Other chronic res...|
+--------+--------------------+

df1b has 10 rows


In [7]:
# Merges datasets 'df1' with 'df1a', resulting in the dataset 'df1_merged'

df1_merged = df1.join(df1a, on = "location_id", how = "inner")      # Merges datasets "df1" with "df1a" into a new dataset "df1_merged"
df1_merged = df1_merged.filter(df1_merged['location_id'] != '102')  # Filter out rows where 'fips' column has the value '102'
df1_merged = df1_merged.drop(df1a["fips"])                          # Removes the "fips" column that came from df1a, keeping the "fips" column from df1
df1_merged = df1_merged.join(df1b, on = "cause_id", how = "inner")  # Merges datasets "df1_merged" with "df1b"
df1_merged = df1_merged.withColumnRenamed("year_id", "Year")        # Renames column "year_id" to "Year"
df1_merged = df1_merged.filter(df1_merged["Year"] >= 2010)          # Removes all rows where values in the column "year_id" are less than 2010 

df1_merged.show()

+--------+-----------+----+------+----+---------+---------+---------+--------------+--------------------+
|cause_id|location_id|fips|sex_id|Year|     rate|    lower|    upper| location_name|          cause_name|
+--------+-----------+----+------+----+---------+---------+---------+--------------+--------------------+
|     508|        523|   1|     1|2010| 91.02748| 88.46226| 93.55213|       Alabama|Chronic respirato...|
|     508|        523|   1|     1|2011| 91.67503| 88.95623| 94.17292|       Alabama|Chronic respirato...|
|     508|        523|   1|     1|2012| 91.48417| 88.74958| 94.22716|       Alabama|Chronic respirato...|
|     508|        523|   1|     1|2013|92.234344|89.221306| 95.18036|       Alabama|Chronic respirato...|
|     508|        523|   1|     1|2014|92.042435| 88.97468| 95.04819|       Alabama|Chronic respirato...|
|     508|        523|   1|     2|2010| 57.94016|56.150833|59.768143|       Alabama|Chronic respirato...|
|     508|        523|   1|     2|2011|58.9402

In [8]:
from pyspark.sql.functions import when, concat, lit, length

# Filter values with string length 4 using a mask with regex
# Add a '0' to the beginning of the masked 'fips' column
df1_merged = df1_merged.withColumn("fips", 
                                   when(length(col("fips")) == 4, concat(lit("0"), col("fips")))
                                   .otherwise(col("fips")))

# Preserve only the rows which have a 5 digit fips
df1_merged = df1_merged.filter(length(col("fips")) == 5)

df1_merged.show()

+--------+-----------+-----+------+----+---------+---------+---------+--------------+--------------------+
|cause_id|location_id| fips|sex_id|Year|     rate|    lower|    upper| location_name|          cause_name|
+--------+-----------+-----+------+----+---------+---------+---------+--------------+--------------------+
|     508|        614|01001|     1|2010| 95.76898|89.471695|  102.695|Autauga County|Chronic respirato...|
|     508|        614|01001|     1|2011| 95.83572|88.289894|103.04964|Autauga County|Chronic respirato...|
|     508|        614|01001|     1|2012| 98.76378| 91.16536|106.14353|Autauga County|Chronic respirato...|
|     508|        614|01001|     1|2013| 95.33873|  87.9291|102.75713|Autauga County|Chronic respirato...|
|     508|        614|01001|     1|2014| 95.46123| 87.93561|102.96087|Autauga County|Chronic respirato...|
|     508|        614|01001|     2|2010| 66.27883| 61.72828|   70.939|Autauga County|Chronic respirato...|
|     508|        614|01001|     2|20

In [9]:
print("Rows before dropping duplicates:", df1_merged.count())

# Drop potential duplicates from df1_merged
df1_merged = df1_merged.dropDuplicates()

print("Rows after dropping duplicates:", df1_merged.count())

Rows before dropping duplicates: 437550
Rows after dropping duplicates: 437550


# Pre-processing of **df2**

In [10]:
# Reading the .csv file of our second dataset (df2)
df2 = spark.read.csv("data2_US_Pollution_Data/data2/pollution_us_2000_2016.csv", header = True, inferSchema = True)
df2.show()

+---+----------+-----------+--------+--------------------+-------+--------+-------+----------+-----------------+---------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+-----------------+--------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+
|_c0|State Code|County Code|Site Num|             Address|  State|  County|   City|Date Local|        NO2 Units| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|         O3 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|        SO2 Units|SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|         CO Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI|
+---+----------+-----------+--------+--------------------+-------+--------+-------+----------+-----------------+---------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+-----------------+--------+-

In [11]:
from pyspark.sql.types import DoubleType

# We have double type columns here in df2 again, lets convert them to float too
# Identify double type columns and cast them to float
double_columns = [field.name for field in df2.schema.fields if isinstance(field.dataType, DoubleType)]

for col_name in double_columns:
    df2 = df2.withColumn(col_name, col(col_name).cast("float"))

df2.show()

+---+----------+-----------+--------+--------------------+-------+--------+-------+----------+-----------------+---------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+-----------------+--------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+
|_c0|State Code|County Code|Site Num|             Address|  State|  County|   City|Date Local|        NO2 Units| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|         O3 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|        SO2 Units|SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|         CO Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI|
+---+----------+-----------+--------+--------------------+-------+--------+-------+----------+-----------------+---------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+-----------------+--------+-

In [12]:
# Check the number of rows before preprocessing
print("Rows before preprocessing:", df2.count())

# Modifying "State Code" and "County Code" values so they more closely match the US FIPS codes standard
from pyspark.sql.functions import expr, concat_ws

# Add leading zeros to "State Code" if it's a single digit
df2 = df2.withColumn("State Code", expr("LPAD(`State Code`, 2, '0')"))

# Add leading zeros to "County Code" based on the number of digits
df2 = df2.withColumn("County Code", 
                     expr("CASE WHEN length(`County Code`) = 1 THEN concat('00', `County Code`) " + 
                          "WHEN length(`County Code`) = 2 THEN concat('0', `County Code`) " + 
                          "ELSE `County Code` END"))

# Create a new column "fips" by concatenating "State Code" and "County Code"
df2 = df2.withColumn("fips", concat_ws("", df2["State Code"], df2["County Code"]))

# Preserve only the rows with 'fips' length == 5
df2 = df2.filter(length(col("fips")) == 5)

# Show the DataFrame after preprocessing
df2.show()

# If you want to check the number of rows after preprocessing
print("Rows after filtering out any existing (if any) fips values not equal to 5:", df2.count())

Rows before preprocessing: 1746661
+---+----------+-----------+--------+--------------------+-------+--------+-------+----------+-----------------+---------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+-----------------+--------+-----------------+----------------+-------+-----------------+--------+----------------+---------------+------+-----+
|_c0|State Code|County Code|Site Num|             Address|  State|  County|   City|Date Local|        NO2 Units| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|         O3 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|        SO2 Units|SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|         CO Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI| fips|
+---+----------+-----------+--------+--------------------+-------+--------+-------+----------+-----------------+---------+-----------------+----------------+-------+-----------------+--------+----------------+----

In [13]:
from pyspark.sql.functions import year

df2 = df2.withColumn("Year", year("Date Local")).drop("Date Local")    # Converts YYYY-MM-DD date format in the column "Date Local" into just "YYYY" (year only)

In [14]:
df2 = df2.filter((df2["Year"] <= 2014) & (df2['Year'] >= 2010))        # Removes rows where 'Year' is greater than 2014 and less than 2010 (Keeps rows in Year interval [2010; 2014])
df2 = df2.drop("_c0", "Site Num", "Address", "City")                   # Drop the columns that are no longer necessary for df2

# Replace "Parts per billion" with "ppb" in columns "NO2 Units", "SO2 Units"; Replace "Parts per million" with "ppm" in columns "O2 Units", "CO Units"
colvalues_replace = {
    "NO2 Units": "ppb",
    "SO2 Units": "ppb",
    "O3 Units": "ppm",
    "CO Units": "ppm"
}
for col_name, replacement_value in colvalues_replace.items():
    df2 = df2.withColumn(col_name, when(df2[col_name] == "Parts per billion", "ppb").otherwise(replacement_value))

df2.show()

+----------+-----------+-------+--------+---------+---------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+---------+--------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+-----+----+
|State Code|County Code|  State|  County|NO2 Units| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|O3 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|SO2 Units|SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|CO Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI| fips|Year|
+----------+-----------+-------+--------+---------+---------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+---------+--------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+-----+----+
|        04|        013|Arizona|Maricopa|      ppb|32.208332|             48.0|              18|     45|     ppm|0.013

In [15]:
print(f"Rows before dropping duplicates: {df2.count()}") # Rows we have before dropping duplicates

# Drop duplicates from df2
df2 = df2.dropDuplicates()
df2.show()

print(f"Rows after dropping duplicates: {df2.count()}") # Rows we have after dropping the duplicates

Rows before dropping duplicates: 627594
+----------+-----------+----------+------------+---------+---------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+---------+--------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+-----+----+
|State Code|County Code|     State|      County|NO2 Units| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|O3 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|SO2 Units|SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|CO Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI| fips|Year|
+----------+-----------+----------+------------+---------+---------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+---------+--------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+-----+----+
|        04|        013|   Arizona|    Maricopa|      ppb

# Final merging of our datasets and further pre-processing (if necessary): **"df_final"**

In [16]:
# Merge df1_merged and df2 on 'fips' and 'Year' columns
df_final = df1_merged.join(df2, on=['fips', 'Year'], how='inner')
df_final.show()

+-----+----+--------+-----------+------+-----------+------------+----------+---------------+--------------------+----------+-----------+----------+--------+---------+--------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+---------+---------+-----------------+----------------+-------+--------+--------+----------------+---------------+------+
| fips|Year|cause_id|location_id|sex_id|       rate|       lower|     upper|  location_name|          cause_name|State Code|County Code|     State|  County|NO2 Units|NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|O3 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|SO2 Units| SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|CO Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI|
+-----+----+--------+-----------+------+-----------+------------+----------+---------------+--------------------+----------+-----------+----------+--------+---------+--------+-----------------+-----------

In [17]:
# Print schema of df_final to check column types
print("Schema of df_final:")
df_final.printSchema()

Schema of df_final:
root
 |-- fips: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- cause_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- sex_id: integer (nullable = true)
 |-- rate: float (nullable = true)
 |-- lower: float (nullable = true)
 |-- upper: float (nullable = true)
 |-- location_name: string (nullable = true)
 |-- cause_name: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- County Code: string (nullable = true)
 |-- State: string (nullable = true)
 |-- County: string (nullable = true)
 |-- NO2 Units: string (nullable = false)
 |-- NO2 Mean: float (nullable = true)
 |-- NO2 1st Max Value: float (nullable = true)
 |-- NO2 1st Max Hour: integer (nullable = true)
 |-- NO2 AQI: integer (nullable = true)
 |-- O3 Units: string (nullable = false)
 |-- O3 Mean: float (nullable = true)
 |-- O3 1st Max Value: float (nullable = true)
 |-- O3 1st Max Hour: integer (nullable = true)
 |-- O3 AQI: integer (nullable =

In [18]:
# # Reorganizing the columns so we have an easier time reading it (this function is defunct! However running it has no effect.)
# sortedcols = ['Year', 'fips', 'State Code', 'County Code', 'State', 'County', 
#               'location_name', 'cause_name', 'rate', 'upper', 'lower',
#               'NO2 Mean', 'NO2 1st Max Value', 'NO2 1st Max Hour', 'NO2 AQI', 'NO2 Units', 
#               'O3 Mean', 'O3 1st Max Value', 'O3 1st Max Hour', 'O3 AQI', 'O3 Units', 
#               'SO2 Mean', 'SO2 1st Max Value', 'SO2 1st Max Hour', 'SO2 AQI', 'SO2 Units', 
#               'CO Mean', 'CO 1st Max Value', 'CO 1st Max Hour', 'CO AQI', 'CO Units']

# When trying to perform later steps with all the columns included, errors arise, possibly due to the size of the dataset or perhaps values in specific columns that don't do well together.
# For this we will seperate our dataframe in two, export them to MongoDB separately and then join them together later in PowerBI.
sortedcols_part1 = ['State Code', 'County Code', 'State', 'County', 
              'location_name',
              'NO2 Mean', 'NO2 1st Max Value', 'NO2 1st Max Hour', 'NO2 AQI', 'NO2 Units', 
              'O3 Mean', 'O3 1st Max Value', 'O3 1st Max Hour', 'O3 AQI', 'O3 Units', 
              'SO2 Mean', 'SO2 1st Max Value', 'SO2 1st Max Hour', 'SO2 AQI', 'SO2 Units', 
              'CO Mean', 'CO 1st Max Value', 'CO 1st Max Hour', 'CO AQI', 'CO Units']

sortedcols_part2 = ['Year', 'fips', 'cause_name', 'rate', 'upper', 'lower']

df_final_part1 = df_final.select(sortedcols_part1)
df_final_part2 = df_final.select(sortedcols_part2)
df_final_part1 = df_final_part1.orderBy('Year', 'State Code', 'County Code') # Order the part 1 DataFrame by "Year", "State Code" and "County Code" columns
df_final_part2 = df_final_part2.orderBy('Year', 'fips') # Order the part 2 DataFrame by "Year" and 'fips' columns

df_final_part1.show()

+----------+-----------+-------+--------+---------------+---------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+--------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+
|State Code|County Code|  State|  County|  location_name| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|NO2 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|O3 Units|SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|SO2 Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI|CO Units|
+----------+-----------+-------+--------+---------------+---------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+--------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+
|        04|        013|Arizona|Maricopa|Maricopa County|10.333333|             24.0|               7| 

In [19]:
df_final_part2.show()

+----+-----+----------+-----------+----------+----------+
|Year| fips|cause_name|       rate|     upper|     lower|
+----+-----+----------+-----------+----------+----------+
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|Asbestosis|0.102107264|0.13413076|0.07225539|
|2010|04013|As

In [20]:
print(f"Rows before dropping duplicates (part 1): {df_final_part1.count()}")

df_final_part1 = df_final_part1.dropDuplicates() # Drop duplicates from part 1 of final dataset

df_final_part1.show()

print(f"Rows after dropping duplicates (part 1): {df_final_part1.count()}")

Rows before dropping duplicates (part 1): 17244900
+----------+-----------+-----------+----------------+--------------------+---------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+---------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+
|State Code|County Code|      State|          County|       location_name| NO2 Mean|NO2 1st Max Value|NO2 1st Max Hour|NO2 AQI|NO2 Units| O3 Mean|O3 1st Max Value|O3 1st Max Hour|O3 AQI|O3 Units| SO2 Mean|SO2 1st Max Value|SO2 1st Max Hour|SO2 AQI|SO2 Units| CO Mean|CO 1st Max Value|CO 1st Max Hour|CO AQI|CO Units|
+----------+-----------+-----------+----------------+--------------------+---------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------+---------+-----------------+----------------+-------+---------+--------+----------------+---------------+------+--------

In [21]:
print(f"Rows before dropping duplicates (part 2): {df_final_part2.count()}")

df_final_part2 = df_final_part2.dropDuplicates() # Drop duplicates from part 2 of final dataset

df_final_part2.show()

print(f"Rows after dropping duplicates (part 2): {df_final_part2.count()}")

Rows before dropping duplicates (part 2): 17244900
+----+-----+--------------------+------------+------------+------------+
|Year| fips|          cause_name|        rate|       upper|       lower|
+----+-----+--------------------+------------+------------+------------+
|2014|06059|Chronic respirato...|    35.14851|   36.423782|   33.915813|
|2014|06059|Other pneumoconiosis| 0.008034298|  0.01907339| 0.002567423|
|2014|06059|      Pneumoconiosis|  0.12829508|  0.20271909| 0.078745276|
|2014|06059|Interstitial lung...|    5.672771|   6.7352705|   3.3849025|
|2014|06059|              Asthma|   0.7473926|   0.8759542|  0.65121347|
|2014|06059|Interstitial lung...|   4.0091004|   4.8745923|   2.3248918|
|2014|06059|Coal workers pneu...| 0.007715228|  0.02498612|0.0019171194|
|2014|06059|          Asbestosis| 0.005963239|  0.01475996| 0.001976785|
|2014|06059|Coal workers pneu...|0.0039601624| 0.010622047|0.0011987275|
|2014|06059|      Pneumoconiosis|  0.15490195|  0.20428553|  0.12079911|


# Create .csv file
We will convert the pyspark dataframes into pandas dataframes so we're able to create the .csv files.

In [29]:
import pandas as pd

# Convert PySpark DataFrame to Pandas DataFrame
df_final_part1_pd = df_final_part1.toPandas()
df_final_part2_pd = df_final_part2.toPandas()

# Write Pandas DataFrames to CSV files
df_final_part1_pd.to_csv('bigdata_respiratory_part1.csv', index=False)
df_final_part2_pd.to_csv('bigdata_respiratory_part2.csv', index=False)

# PySpark -> MongoDB Connection

### PART 1 COLS ("Respiratory_part1cols")

In [22]:
import pandas as pd
from pymongo import MongoClient

# Convert PySpark DataFrame to RDD of dictionaries
rdd_data = df_final_part1.rdd.map(lambda row: row.asDict()) # Part 1 of final dataframe

# Initialize MongoDB client and select database/collection
client = MongoClient("mongodb://localhost:27017")

db = client["BigData"]

print(db.list_collection_names()) # Checks what collections are present in MongoDB

collection = db["Respiratory_part1cols"] # Prepares collection for part 1

['Respiratory_df1cols', 'Respiratory_part2cols', 'Respiratory', 'Respiratory_part1cols']


In [23]:
spark = SparkSession.builder \
    .appName("merge_datasets") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

In [24]:
# Writes part 1 of final dataframe into MongoDB
df_final_part1.write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .mode("overwrite") \
    .option("uri", "mongodb://localhost:27017/BigData.Respiratory_part1cols") \
    .save()

In [25]:
# add 3 example columns from merged_dataset to TEST if it works
#df_final.select('Year', 'fips', 'State Code', 'County Code', 'State', 'County').write \
#    .format("com.mongodb.spark.sql.DefaultSource") \
#    .mode("overwrite") \
#    .option("uri", "mongodb://localhost:27017/BigData.Respiratory") \
#    .save()

# Apache Spark and MongoDB Connector versions are compatible
# Java version is JDK 17, compatible and properly configured with the JAVA_HOME environment 
# pyspark shell is installed and properly configured with the SPARK_HOME environment
# mongo-spark-connector_2.12-3.0.1.jar file is in the C:\SPARK\jars directory
# mongo-java-driver-3.12.14.jar file is in the C:\SPARK\jars directory

### PART 2 COLS ("Respiratory_part2cols")

In [26]:
import pandas as pd
from pymongo import MongoClient

# Convert PySpark DataFrame to RDD of dictionaries
rdd_data = df_final_part2.rdd.map(lambda row: row.asDict()) # Part 2 of final dataframe

# Initialize MongoDB client and select database/collection
client = MongoClient("mongodb://localhost:27017")

db = client["BigData"]

print(db.list_collection_names()) # Checks what collections are present in MongoDB

collection = db["Respiratory_part2cols"] # Prepares collection for part 2

['Respiratory_df1cols', 'Respiratory_part2cols', 'Respiratory', 'Respiratory_part1cols']


In [27]:
spark = SparkSession.builder \
    .appName("merge_datasets") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

In [28]:
# Writes part 2 of final dataframe into MongoDB
df_final_part2.write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .mode("overwrite") \
    .option("uri", "mongodb://localhost:27017/BigData.Respiratory_part2cols") \
    .save()