# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Mini-Project: Implementation of Linear Regression on a Large Dataset Using Dask Library

**DISCLAIMER:** THIS NOTEBOOK IS PROVIDED ONLY AS A REFERENCE SOLUTION NOTEBOOK FOR THE MINI-PROJECT. THERE MAY BE OTHER POSSIBLE APPROACHES/METHODS TO ACHIEVE THE SAME RESULTS.

## Learning Objectives

At the end of the mini-project, you will be able to :

- understand how dask handles large dataset over pandas dataframe 
- perform exploratory data analysis on a large dataset (2 Million rows) using dask
- implement linear regression model using dask library and make predictions


## Problem Statement

 Predict the taxi fare amount in New York city using Dask-ML.

## Information

### Dask 
[Dask](https://dask.pydata.org/en/latest/) is an open source project that gives abstractions over NumPy Arrays, Pandas Dataframes and regular lists, allowing you to run operations on them in parallel, using multicore processing.

We can summarize the basics of Dask as follows:

* processes data that doesn’t fit into memory by breaking it into blocks and specifying task chains

* parallelizes execution of tasks across cores and even nodes of a cluster

* moves computation to the data rather than the other way around, to minimize communication overhead

### Dataset

The dataset is based on the 2016 NYC Yellow Cab trip record data made available in Big Query on Google Cloud Platform. Its variables are as follows:
![Dataset](https://cdn.iisc.talentsprint.com/CDS/Images/NYC_Taxi_data_description.png)




## Grading = 10 Points

In [None]:
#@title Install Dask dependencies and restart runtime
!pip -qq install dask-ml==1.8.0
!pip -qq install dask==2.9.1
!pip -qq install dask[delayed]
!pip -qq install dask[dataframe] --upgrade

#### Importing Necessary Packages

In [None]:
import warnings
warnings.filterwarnings('ignore')
import dask
import dask.dataframe as dd
import dask.array as da
from dask_ml.linear_model import LinearRegression
from dask_ml.model_selection import train_test_split
from dask_ml.metrics import mean_squared_error, r2_score
from dask.distributed import Client
import time as time
import numpy as np
import pandas as pd
import seaborn as sns, matplotlib.pyplot as plt
% matplotlib inline

from dask.distributed import Client, progress
# client = Client()

In [None]:
#@title Download the data
!wget -qq https://cdn.iisc.talentsprint.com/CDS/MiniProjects/Dask_MP_dataset.csv

#### Exercise 1: Read the dataset using dask library and compare the time of execution with pandas library.

**Hint:** pass `dtype` for passenger_count as `int64`

In [None]:
%%time
ddf = dd.read_csv('/content/Dask_MP_dataset.csv', dtype={'passenger_count': 'int64'})

#### Use pandas to read the dataset and compare the time taken

In [None]:
%%time
df_pd = pd.read_csv('/content/Dask_MP_dataset.csv')

In [None]:
%%time
ddf.head()

### Data Analysis

#### Exercise 2: Drop the unnecessary columns. Also drop the duplicate rows and the rows having null values.

**Hint:** Drop those columns which are not useful in EDA as well as model implementation

In [None]:
ddf = ddf.drop(["key", "Unnamed: 0"],axis=1)

In [None]:
# Drop duplicate rows
%%time
ddf = ddf.drop_duplicates()

In [None]:
# ddf['pickup_datetime'] = dask.dataframe.to_datetime(ddf['pickup_datetime'])
# ddf['year'] = ddf['pickup_datetime'].dt.year
# ddf.head()

In [None]:
%%time
ddf = ddf.dropna()

In [None]:
# Drop duplicate rows in pandas dataframe
%%time
df_pd = df_pd.drop_duplicates()

In [None]:
%%time
df_pd = df_pd.dropna()

In [None]:
%%time
ddf.groupby("passenger_count").fare_amount.mean().compute()

In [None]:
%%time
df_pd.groupby("passenger_count").fare_amount.mean()

In [None]:
%%time 
ddf[["fare_amount"]].mean().compute()

In [None]:
%%time 
df_pd[["fare_amount"]].mean()

#### Exercise 3: Visualize the target variable, i.e., `fare_amount` to study the fare distribution, using a histogram density plot. Analyze the fare_amount distribution, try to visualize it for a range of [0, 60].

**Hint:** [sns.hisplot()](https://stackoverflow.com/questions/51027636/seaborn-histogram-with-bigdata/51027895) and use `.between` to plot the graph for given range 


In [None]:
#exploring data

def plot_dist(series=ddf["fare_amount"], title = "Fare Distribution"):
  sns.histplot(series, kde=True, stat='density',discrete=True)
  sns.despine()
  plt.title(title)
  sns.histplot(series, kde=True, stat='density',discrete=True)
  sns.despine()
  plt.title(title);
  plt.show();
  plt.show()
plot_dist()

In [None]:
#dropping absurd values and plotting fare amount in the range [0, 60]
ddf = ddf[ddf.fare_amount.between(0,60)]
plot_dist(ddf.fare_amount)

#### Observe the number of workers and cores running in your machine

Initialize a client and observe how many workers are working and the number of cores utilizing for the given data set.

In [None]:
# Initializing a client
# client = Client(processes=False)
# client

From, above you can observe how many workers are working and the number of cores utilizing for the given data set.

### EDA based on Time

#### Exercise 4: Extract day of the week (dow), hour, month and year from `pickup_datetime`.

**Hint:** use `pd.to_datetime()` function as dask does not have this functionality in it.

Remember to use `.compute()` while passing the dask dataframe in defined function.

In [None]:
# pickup_datetime feature

def extract_time_features(ddf):
    timezone_name = 'America/New_York'
    time_column = "pickup_datetime"
    ddf.index = pd.to_datetime(ddf[time_column])
    #ddf.index = ddf.index.tz_convert(timezone_name)
    ddf["dow"] = ddf.index.weekday
    ddf["hour"] = ddf.index.hour
    ddf["month"] = ddf.index.month
    ddf["year"] = ddf.index.year
    return ddf.reset_index(drop=True)

In [None]:
ddf = extract_time_features(ddf.compute())

In [None]:
ddf.head()

#### Exercise 5: a.) Plot the taxi trip by hour of the day

* Partition the data into segments using `dask.from_pandas()`

* Plot the taxi trip for hour of the day. **Hint:** [sns.catplot](https://seaborn.pydata.org/generated/seaborn.catplot.html)

In [None]:
ddf = dd.from_pandas(ddf, npartitions=3)

In [None]:
type(ddf)

In [None]:
# taxi trip repartition by hour of the day

sns.catplot(x="hour", kind="count", palette="icefire", data=ddf.compute(), height=5, aspect=3);
sns.despine()
plt.title('Hour of Day');
plt.show()

#### Exercise 5: b.) Plot the taxi trip repartition by day of the week (dow)

In [None]:
# taxi trip repartition by day of the week

sns.catplot(x="dow", kind="count", palette="icefire", data=ddf.compute(), height=5, aspect=3);
sns.despine()
plt.title('Day of Week');
plt.show()

#### Exercise 6: a.) Draw a plot between the target variable and passenger count and analyze it. 

In [None]:
#passenger count feature

sns.catplot(x="passenger_count", y="fare_amount", palette="icefire", data=ddf.compute(), kind="bar", aspect=3)
sns.despine()
plt.show()

#### Exercise 6: b.) Draw a plot between the target variable and hour and analyze it. 

In [None]:
#fare amount by hour

sns.catplot(x="hour", y="fare_amount", palette="icefire", data=ddf.compute(), kind="bar", aspect=3)
sns.despine()
plt.show()

#### Exercise 7: Compute the Haversine distance between samples

* Convert the latitude and longitude co-rodinates to radians

* Calculate the Haversine distance

  **Hint:** [haversine_distances](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html)

* Add the "distance" feature to the dataset and plot its distribution

In [None]:
# Distance feature (given formula)

def haversine_distance(ddf,
                       start_lat="start_lat",
                       start_lon="start_lon",
                       end_lat="end_lat",
                       end_lon="end_lon"):
    
    # Calculate the great circle distance between two points 
    #on the earth (specified in decimal degrees).
       
    # Vectorized version of the haversine distance for pandas df
    #Computes distance in kms
    
    lat_1_rad, lon_1_rad = np.radians(ddf[start_lat].astype(float)), np.radians(ddf[start_lon].astype(float))
    lat_2_rad, lon_2_rad = np.radians(ddf[end_lat].astype(float)), np.radians(ddf[end_lon].astype(float))
    dlon = lon_2_rad - lon_1_rad
    dlat = lat_2_rad - lat_1_rad

    a = np.sin(dlat / 2.0) ** 2 + np.cos(lat_1_rad) * np.cos(lat_2_rad) * np.sin(dlon / 2.0) ** 2
    c = 2 * np.arcsin(np.sqrt(a))
    haversine_distance = 6371 * c
    return haversine_distance

In [None]:
%%time
ddf["distance"] = haversine_distance(ddf, 
                                     start_lat="pickup_latitude", start_lon="pickup_longitude",
                                     end_lat="dropoff_latitude", end_lon="dropoff_longitude"
                                     )

In [None]:
ddf.distance.describe()

In [None]:
%matplotlib inline
plot_dist(series=ddf[ddf.distance<50].distance, title = "Distance distribution")

### Correlation between distance and fare amount

In [None]:
# Correlation between fare_amount and distance

sns.scatterplot(x="distance", y="fare_amount", palette="icefire",data=ddf[ddf.distance < 80].compute().sample(10000))
plt.show()

### Preparing dataset for model implementation

**Note:** For the above modified dataset, perform the initial preprocessing steps before applying the modelling.

In [None]:
# Read the dataset to prepare for training

data_train = ddf

### Removing outliers from training set Based on Coordinates

#### Exercise 8: Remove the outliers using the given latitude and longitude features from the dataset. We need to analyze the data of taxi within New York City.

**Hint:** Given the co-ordinates of New York city are Latitude: 40.7128° and Longitude: -74.0060°. You can include the pickup and drop off points such that there left and right value mean will be the given co-ordinate value. 

Also, choose nearest extreme values.

Use `.between()` and pass left and right value attributes accordingly.

In [None]:
%%time
data_train = data_train[data_train["pickup_latitude"].between(left = 40, right = 42 )]
data_train = data_train[data_train["pickup_longitude"].between(left = -74.3, right = -72.9 )]
data_train = data_train[data_train["dropoff_latitude"].between(left = 40, right = 42 )]
data_train = data_train[data_train["dropoff_longitude"].between(left = -74, right = -72.9 )]

In [None]:
%%time
data_train["distance"] = haversine_distance(data_train, 
                                      start_lat="pickup_latitude", start_lon="pickup_longitude",
                                      end_lat="dropoff_latitude", end_lon="dropoff_longitude"
                                     )

In [None]:
data_train.head()

#### Exercise 9: Divide the data into train and test splits with X as feature variables and y as target variable

* Divide data into train test split with 70-30 ratio, Hint: `train_test_split()`

* As dask functions operate lazily so, before calling `.fit()` function, call the dask dataframe with `.compute()`.
* Convert X_train and y_train into array using `.values` as [dask's](https://ml.dask.org/modules/api.html) `.fit()` function takes array as attribute

In [None]:
X = data_train.drop(["fare_amount", "pickup_datetime"], axis=1)
y = data_train[["fare_amount"]]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

In [None]:
type(X_train)

In [None]:
type(y_train)

In [None]:
X_train = X_train.compute()

In [None]:
y_train = y_train.compute()

In [None]:
#fit the model
lr = LinearRegression()

In [None]:
%%time
lr.fit(X_train.values, y_train.values)

#### Exercise 10: Predict the test data and calculate the mean squared error and r2 score.

**Hint:** Remember to call `.compute()` function as dask functions operate lazily and convert the dask dataframe to `.values` (Array type) as suggested in above exercise

In [None]:
%%time
X_test = X_test.compute()
y_pred = lr.predict(X_test.values)

In [None]:
y_pred

In [None]:
y_test = y_test.to_dask_array(lengths=True)


In [None]:
y_test = y_test.reshape(-1)

In [None]:
y_test.compute()

In [None]:
# Mean squared error
%%time
mean_squared_error(y_test, y_pred)

In [None]:
# R2_Score
%%time
r2_score(y_test, y_pred)

### Report Analysis
* Discuss the pros and cons of using dask
* Discuss the EDA insights

