#Data Sourcing

- Based on Phase 3 requirements: Pull the data for recent years; Provide a clean dataset for Flights and/or Weather for Years 2020-2023.
- Save as parquet file, with same structure as raw datasets

**TO DO:**
- Weather data is available at https://www.ncei.noaa.gov/data/local-climatological-data/archive/ up to 2024. 
- Data dictionary for weather: https://www.ncei.noaa.gov/pub/data/cdo/documentation/LCD_documentation.pdf

# Setup cluster


In [0]:
blob_container = "261storagecontainer"  
storage_account = "261storage" 
secret_scope = "261_team_6_1_spring24_scope"  
secret_key = "team_6_1_key"  
team_blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net" 


# blob storage is mounted here.
mids261_mount_path = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
    f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
    dbutils.secrets.get(scope=secret_scope, key=secret_key),
)

# see what's in the blob storage root folder
# display(dbutils.fs.ls(f"{team_blob_url}"))

# mount
data_BASE_DIR = "dbfs:/mnt/mids-w261/"
# display(dbutils.fs.ls(f"{data_BASE_DIR}"))


# Import libraries

In [0]:
#standard
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F #cleaning: split, col, when, lit, concat_ws,regexp_replace, regexp_extract
import seaborn as sns

#imputing
from pyspark.ml.feature import Imputer

#normalization and feature extraction
from pyspark.ml.feature import StandardScaler, VectorAssembler,PCA

# to download files from url
import requests
import tarfile
import os
from io import BytesIO

#map
# import folium


#Load raw data (weather)


In [0]:
# Weather data
df_weather = spark.read.parquet(f"dbfs:/mnt/mids-w261/datasets_final_project_2022/parquet_weather_data/")
# display(df_weather)

#Load new weather data 


In [0]:
weather_20_df = spark.read.parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/weather_data_2020/weather_20_parquet")
weather_21_df = spark.read.parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/weather_data_2021/weather_21_parquet")
weather_22_df = spark.read.parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/weather_data_2022/weather_22_parquet")
weather_23_df = spark.read.parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/weather_data_2023/weather_23_parquet")

In [0]:
# Create a folder for downloaded files
dbutils.fs.mkdirs(f"{team_blob_url}/full_weather_data_2010_2023") 

# see team blob contents
# display(dbutils.fs.ls(f"{team_blob_url}/"))

# Join all new weather files

In [0]:
# Add a variable YEAR, with the year for each dataframe (2020,2021,2022,2023)
weather_20_df = weather_20_df.withColumn("YEAR", F.lit(2020))
weather_21_df = weather_21_df.withColumn("YEAR", F.lit(2021))
weather_22_df = weather_22_df.withColumn("YEAR", F.lit(2022))
weather_23_df = weather_23_df.withColumn("YEAR", F.lit(2023))

In [0]:
df_weather_2020_2023 = weather_20_df.union(weather_21_df).union(weather_22_df).union(weather_23_df)

# Save

In [0]:
# Write the DataFrame as a Parquet file
df_weather_2020_2023.write.mode("overwrite").parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/full_weather_data_2010_2023/df_weather_2020_2023_parquet")

In [0]:
#load checkpointed 2023 weather
df_weather_2020_2023 = spark.read.parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/full_weather_data_2010_2023/df_weather_2020_2023_parquet")

# Sanity checks

1. Numrows (df_weather_2020_2023) = numrows(weather_20_df) + numrows(weather_21_df) + numrows(weather_22_df) + numrows(weather_23_df) CHECK
2. No additional columns: Two new variables (DYTS, DYHF) plus one added by me (source_file), but all new files have the same number of columns. CHECK

In [0]:
print("-"*100)
print("NEW DATA")
print("-"*100)
print(f'2020: {weather_20_df.count()} rows and {len(weather_20_df.columns)} columns')
print(f'2021: {weather_21_df.count()} rows and {len(weather_21_df.columns)} columns')
print(f'2022: {weather_22_df.count()} rows and {len(weather_22_df.columns)} columns')
print(f'2023: {weather_23_df.count()} rows and {len(weather_23_df.columns)} columns')
print(f'2020-2023: {df_weather_2020_2023.count()} rows and {len(df_weather_2020_2023.columns)} columns')
print("-"*100)
print("OLD DATA")
print("-"*100)
print(f'2015-2019: {df_weather.count()} rows and {len(df_weather.columns)} columns')


## EDA, to make sure new file is similar to previous raw_weather file

1. Dimensions (see previous point)
2. Schema - typecast all to string (expect for YEAR), to match df_weather Schema
3. Percentage of missing columns
4. Unique stations - they match 

In [0]:
df_weather_2020_2023.printSchema()


In [0]:
df_weather.printSchema()


In [0]:
# remove variables from new dataset which are not found in the old one: 
to_drop = ['DYTS', 'DYHF']
df_weather_2020_2023 = df_weather_2020_2023.drop(*to_drop)

### Typecast to match schema

In [0]:
from pyspark.sql.types import StringType

# typecast to string
string_cols = [c for c in df_weather_2020_2023.columns if c != 'YEAR']

# Create a new DataFrame with typecasted columns
df_new = df_weather_2020_2023.select([
    F.when(F.col(c).isNull(), F.lit(None).cast(StringType()))
    .otherwise(F.col(c).cast(StringType())).alias(c) if c in string_cols else F.col(c)
    for c in df_weather_2020_2023.columns
])

df_new.printSchema()

### Unique stations

The 2020-2023 dataset has 993 fewer stations than the 2015-2019. This could be for multiple reasons: 
- a) The missing stations no longer exist/report their data (no way to check this without taking too much time)
- b) The csv uploads into the blob went wrong. 
- c) The csv - parquet aggregation went wrong. 

To test options b and c, I will count the number of unique `STATION`, and make sure that it matches the number of `source_file`: When I tried manually, the decormpressed tar file output one csv per station.

**CONCLUSION**
Option a is the most likely, as the number of number of unique stations is equal to the number of unique source files. 

In [0]:
print(f'Unique stations, 2015-2019: {df_weather.select("STATION").distinct().count()}')
print(f'Unique stations, 2020-2023: {df_weather_2020_2023.select("STATION").distinct().count()}')

# Find which stations from df_weather are not in df_weather_2020_2023
missing_stations = df_weather.select("STATION").subtract(df_weather_2020_2023.select("STATION"))
missing_stations_count = missing_stations.count()

print(f'Stations missing in 2020-2023 data: {missing_stations_count}')


- Count the number of files in each folder, along with the number of `source_file`for each year, to confirm that they are equal. 

In [0]:
#unique source files
weather_20_count = weather_20_df.select('source_file').distinct().count()
weather_21_count = weather_21_df.select('source_file').distinct().count()
weather_22_count = weather_22_df.select('source_file').distinct().count()
weather_23_count = weather_23_df.select('source_file').distinct().count()

#unique stations
weather_20_station_count = weather_20_df.select('STATION').distinct().count()
weather_21_station_count = weather_21_df.select('STATION').distinct().count()
weather_22_station_count = weather_22_df.select('STATION').distinct().count()
weather_23_station_count = weather_23_df.select('STATION').distinct().count()

print(f'2020 has {weather_20_count} unique source_files')
print(f'2021 has {weather_21_count} unique source_files')
print(f'2022 has {weather_22_count} unique source_files')
print(f'2023 has {weather_23_count} unique source_files')

# check they're the same - yes. 
assert(weather_20_count==weather_20_station_count)
assert(weather_21_count==weather_21_station_count)
assert(weather_22_count==weather_22_station_count)
assert(weather_23_count==weather_23_station_count)


#### Number of files per year

|Year|Number of source files in data frame|Number of files when manually decompressed|Number of unique stations per year|
|---|---|---|---|
|2020|13562|13562|13562|
|2021|13539|13539|13539|
|2022|13468|13468|13468|
|2023|13422|13422|13422|

### Plot all stations on a map to see if it resembles the raw 2015-2019 map

(It does, just cleared output because the notebook was struggling with so much rendering)

In [0]:
locations_df = df_weather_2020_2023.select("STATION", 'LATITUDE', 'LONGITUDE')
locations_df = locations_df.dropDuplicates(subset=['STATION'])
locations_df = locations_df.na.drop()

# Convert Spark DataFrame to Pandas DataFrame
locations_df_pd = locations_df.toPandas()

# Create a map centered around the world
world_map = folium.Map(location=[0, 0], zoom_start=2)

# Define a function to add markers
def add_marker(row):
    station = row.STATION
    latitude = row.LATITUDE
    longitude = row.LONGITUDE
    folium.Marker(
        [float(latitude), float(longitude)],
        popup=station,
    ).add_to(world_map)

# Apply the function to each row of the DataFrame
locations_df_pd.apply(add_marker, axis=1)

# Display the map
world_map

# Save final Parquet version to blob

In [0]:
df_weather_2020_2023_clean = df_weather_2020_2023.drop('source_file')

In [0]:
# change data format from 2023-01-03T20:00:00.000+00:00 to 2015-05-03T19:00:00
df_weather_2020_2023_clean = df_weather_2020_2023_clean.withColumn(
    "DATE",
    F.date_format("DATE", "yyyy-MM-dd'T'HH:mm:ss")
)

In [0]:
print("Weather variables, 2015-2019")
print("-"*30)
df_weather.display()
print("Weather variables, 2020-2023")
print("-"*30)
df_weather_2020_2023_clean.display()

In [0]:
# Convert variable types in df_weather_2020_2023_clean to match the schema in df_weather

for column in df_weather_2020_2023_clean.columns:
    # Get the desired data type from df_weather
    target_data_type = df_weather.schema[column].dataType
    
    # Convert the column type in df_weather_2020_2023_clean
    df_weather_2020_2023_clean = df_weather_2020_2023_clean.withColumn(column, df_weather_2020_2023_clean[column].cast(target_data_type))


assert df_weather_2020_2023_clean.schema == df_weather.schema, "Schemas do not match"

In [0]:
assert len(df_weather_2020_2023_clean.columns) == len(df_weather.columns), "Number of columns does not match"

In [0]:
# Write the DataFrame as a Parquet file
df_weather_2020_2023_clean.write.mode("overwrite").parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/full_weather_data_2010_2023/df_weather_2020_2023_clean")

In [0]:
#load checkpointed 2023 weather
df_weather_2020_2023_clean = spark.read.parquet("wasbs://261storagecontainer@261storage.blob.core.windows.net/full_weather_data_2010_2023/df_weather_2020_2023_clean")
# df_weather_2020_2023_clean.display()

# Save to DBFS

- No writing privileges, so we will leave the files in our blob storage and share our credentials with Vini. 

In [0]:
# create new dir
dbutils.fs.mkdirs(f"{data_BASE_DIR}/additional_data_sourcing/team_6_1") 

In [0]:
# save
df_weather_2020_2023_clean.write.mode("overwrite").parquet("dbfs:/mnt/mids-w261/additional_data_sourcing/team_6_1/df_weather_2020_2023_clean")

In [0]:
# read
df_weather_2020_2023_clean = spark.read.parquet("dbfs:/mnt/mids-w261/additional_data_sourcing/team_6_1/df_weather_2020_2023_clean")
df_weather_2020_2023_clean.display()

# Delete all csv files from blob storage (pending review by team)


In [0]:
folders = [f"{team_blob_url}/weather_data_2020/,{team_blob_url}/weather_data_2021/,{team_blob_url}/weather_data_2022/,{team_blob_url}/weather_data_2023/"]