# Step4-download external data & merge

## 1. Impot necessary modules & start a spark session

In [None]:
# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from urllib.request import urlretrieve
import os

In [None]:
# Create a Spark session
spark = (
    SparkSession.builder.appName('ADS_project_1.py')
    .config('spark.sql.repl.eagerEval.enabled', True)
    .config('spark.sql.parquet.cacheMetadata', 'true')
    .config('spark.sql.session.timeZone', 'Etc/UTC')
    .config('spark.driver.memory', '16g')
    .config('spark.executer.memory', '16g')
    .getOrCreate()
)

## 2. External data download, import & overview

This `external_data` is about New York City weather, which we think can inflence 'trip_duration'

### 2.1 Create folders for `external_data`

In [None]:
# Define a list of data steps
data_step_list = ['landing', 'raw', 'curated']

# Loop through the lists of data steps, sources & usage
for data_step in data_step_list:
    # Define the directory for data sources & usage in '../data/{data_step}/'
    directory = (
        '../data/' + data_step + '/' + 'external_data'
    )
    # Check if the directory exists; if not, create it
    if not os.path.exists(directory):
        os.makedirs(directory)

### 2.2 Download `external_data` to the directory `data/landing/external_data/`

In [None]:
specific_data_url = (
    'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/retrievebulkdataset'
    '?&key=5NFQXAGG9ENFS7A5S6G8G5ZXH&taskId=dc3f21a8e55c0d7422370cdc0979cb45&zip=false'
)
output_path = '../data/landing/external_data/external_data.csv'
urlretrieve(specific_data_url, output_path)

### 2.3 Import `external_data` from directory `data/landing/external_data/`

In [None]:
external_data_dir = '../data/landing/external_data/external_data.csv'
external_data = spark.read.csv(external_data_dir, header=True)

### 2.4 Show #rows, #cols & overview of `external_data`

In [None]:
original_num_rows = external_data.count()
original_num_cols = len(external_data.columns)

print('number of rows: ', original_num_rows)
print('number of cols: ', original_num_cols)
external_data.limit(5)

## 3. Preprocessing for external data

### 3.1 Remove features we intuitively think are unsignificant or duplicate information from other features

Descriptive Statistics of features of `external_data`

In [None]:
external_data.describe()

Observe all the values of the two features 'conditions' & 'icon'. We find that most general info appears in these 2 features

In [None]:
external_data.select('conditions').distinct().show()
external_data.select('icon').distinct().show()

Save useful features

In [None]:
useful_feature_list = ['datetime', 'feelslike', 'visibility', 'uvindex', 'conditions']
external_data = external_data.select(useful_feature_list)

Show data shape

In [None]:
print('number of rows: ', external_data.count())
print('number of cols: ', len(external_data.columns))
external_data.limit(5)

Save this raw `external_data` to the directory `data/raw/external_data/`

In [None]:
external_data.write.mode('overwrite').parquet('../data/raw/external_data/external_data.parquet')

创建新的feature

### 3.2 Create new features

In [None]:
external_data = (
    external_data
        # Create 'date' from 'datetime' by extracting month and day
        .withColumn('date', F.date_format('datetime', 'MM-dd'))

        # Create 'if_rain' from 'conditions', values are 0 & 1
        .withColumn('if_rain', F.when(external_data['conditions'].contains('Rain'), 1).otherwise(0))

        # Create 'if_snow' from 'conditions', values are 0 & 1
        .withColumn('if_snow', F.when(external_data['conditions'].contains('Snow'), 1).otherwise(0))

        # Create 'if_overcast' from 'conditions', values are 0 & 1
        .withColumn('if_overcast', F.when(external_data['conditions'].contains('Overcast'), 1).otherwise(0))

        # Create 'if_cloudy' from 'conditions', values are 0 & 1
        .withColumn(
            'if_cloudy', 
            F.when(
                external_data['conditions'].contains('Cloudy') | external_data['conditions'].contains('Partially cloudy'), 
                1
            ).otherwise(0)
        )

        # Create 'if_clear' from 'conditions', values are 0 & 1
        .withColumn('if_clear', F.when(external_data['conditions'].contains('Clear'), 1).otherwise(0))
)

external_data.limit(5)

### 3.3 Changes for readability

Rename 2 features

In [None]:
external_data = external_data.withColumnRenamed('feelslike', 'temperature') \
                             .withColumnRenamed('uvindex', 'uv_index') 

Delete features that have already been used to extract information and no longer needed

In [None]:
useful_feature_list = [
    'date', 'temperature', 'uv_index', 'visibility', 'if_rain', 'if_snow', 'if_overcast', 'if_cloudy', 'if_clear'
]
external_data = external_data.select(useful_feature_list)

Show dinal data shape of `external_data`

In [None]:
num_rows_after_preprocessing = external_data.count()
num_cols_after_preprocessing = len(external_data.columns)

print('number of rows: ', num_rows_after_preprocessing)
print('number of cols: ', num_cols_after_preprocessing)

Save curated `external_data` to the directory `data/curated/external_data/`

In [None]:
external_data.write.mode('overwrite').parquet('../data/curated/external_data/external_data.parquet')

## 4. Merge `TLC_data` & `external_data`

Import curated `TLC_data` from directory `data/curated/TLC_data/`

In [None]:
TLC_data_path = '../data/curated/TLC_data/TLC_data.parquet/'
TLC_data = spark.read.parquet(TLC_data_path)

Merge `TLC_data` & `external_data` based on the shared feature 'date'

In [None]:
merged_data = TLC_data.join(external_data, on="date", how="left")

`merged_data` overview

In [None]:
merged_num_rows = merged_data.count()
merged_num_cols = len(merged_data.columns)

print('#rows of merged_data: ', merged_num_rows)
print('#cols of merged_data: ', merged_num_cols)

merged_data.limit(5)

Create folder for `merged_data`, and save it to directory `data/merged_data/`

In [None]:
directory = ('../data/merged_data/')

if not os.path.exists(directory):
    os.makedirs(directory)

merged_data.write.mode('overwrite').parquet('../data/merged_data/merged_data.parquet')

## 5. Stop spark session

In [None]:
spark.stop()