## **MAST30034 Applied Data Science week 1**

#### Today

0.  Environment
1. Install Pyspark
2. Pyspark初体验
3. Download Dataset
4. Pyspark basics

##### 0. Environment

##### 1. Installing Pyspark

**For WSL2 users:**

1. update ubuntu
    - `sudo apt update`
2. Check Java
    - `java --version`
3. Install Java
    - `sudo apt install openjdk-8-jdk`
4. Check Java
    - `java --version`
5. Path
    - `echo 'JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"' | sudo tee -a /etc/environment`
6. Apply
    - `source /etc/environment`
7. Python install
    - `pip3 install pyspark pyarrow pandas`

**MacOS users:**

Follow tutorial sheet

##### 2. Pyspark 初体验

### PySpark

PySpark v.s. Pandas:
- 1. 速度更快
- 2. 内存更大
- 3. 并行的，集成的运算引擎

PySpark supports all of Spark’s features such as Spark SQL, DataFrames, Structured Streaming, Machine Learning (MLlib) and Spark Core. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes).

PySpark is very well used in Data Science and Machine Learning community as there are many widely used data science libraries written in Python including NumPy, TensorFlow. Also used due to its efficient processing of large datasets. PySpark has been used by many organizations like Walmart, Trivago, Sanofi, Runtastic, and many more.

#### Advantages
- fast speed
- large memory
- combine local and distributed data transformation
- lazy evaluation

### Basics

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
# Start a Spark session, all analysis are then based on the session

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("CPU ADS wk 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    # 如果后续发现数据太大内存不够，改这里
    .config("spark.driver.memory", "4g")
    .config("spark.executer.memory", "8g")
    .getOrCreate()
)


23/08/02 14:47:49 WARN Utils: Your hostname, Luo resolves to a loopback address: 127.0.1.1; using 172.17.36.219 instead (on interface eth0)
23/08/02 14:47:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/02 14:47:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
conf = spark.sparkContext.getConf()
print("spark.app.name = ", conf.get("spark.app.name"))

spark.app.name =  CPU ADS wk 1


**An important property of Spark is that it is immutable**

##### 3. Download Dataset

In [4]:
from urllib.request import urlretrieve
import os
import sys

做整个项目之前我们首先要安排好我们这个project的文件夹的结构，

例如我们的data要全部归纳在project/data/里， code归纳在project/notebooks/里。然后再细分为raw data和curated data等等

所以第一步我们可以先手动创建一个notebook文件夹然后在里面创建我们第一个notebook用来下载数据

然后我们来创建data文件夹

In [5]:
output_relative_dir = f'../data/'
if not os.path.exists(output_relative_dir):
    os.makedirs(output_relative_dir)

for type in ['raw', 'curate']:
    #path ../data/raw or ../data/curated
    path = output_relative_dir + type
    if not os.path.exists(path):
        os.makedirs(path)

    for source in ['external', 'tlc_data']:
        #path ../data/raw/external or ../data/raw/tlc_data
        path = output_relative_dir + type + '/' + source
        if not os.path.exists(path):
            os.makedirs(path)

创建图像文件夹

In [6]:
for year in ['2019', '2021']:
    path = f'../data/raw/tlc_data/'
    if not os.path.exists(path + year):
        os.makedirs(path + year)

那我们处理data可能分批次处理并做preliminary，那在curated里我们要给data清理做个分类

In [7]:
output_relative_dir = f'../data/curate/tlc_data/'
#path ../data/curate/tlc_data/first_clean or ../data/curate/tlc_data/final_data

那我们的文件夹结构已经初步规划好了，接下来就可以把原始数据下载下来了，我们要下载到的路径是：

`project/data/raw/tlc_data/2019` and `project/data/raw/tlc_data/2021`

第一步先找到完整的下载地址： `https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{YEAR}-{MONTH}.parquet`

In [8]:
URL_TEMPLATE = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_"

设定好下载路径的template，以及我们需要的年月的范围

In [9]:
YEAR = ['2019', '2021']

MONTH = range(1, 13)

output_relative_dir = f'../data/raw/tlc_data/'

用两个for loop去下载两年中每个月的数据

In [10]:
for year in YEAR:
    for month in MONTH:
        print(f'starting {year} {month}')
        month = str(month).zfill(2)
        #url = https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-03.parquet
        url = f'{URL_TEMPLATE}{year}-{month}.parquet'
        output_dir = f'{output_relative_dir}/{year}/{year}-{month}.parquet'
        urlretrieve(url, output_dir)
        print(f'finished {year} {month}')


starting 2019 1
finished 2019 01
starting 2019 2
finished 2019 02
starting 2019 3
finished 2019 03
starting 2019 4
finished 2019 04
starting 2019 5
finished 2019 05
starting 2019 6
finished 2019 06
starting 2019 7
finished 2019 07
starting 2019 8
finished 2019 08
starting 2019 9
finished 2019 09
starting 2019 10
finished 2019 10
starting 2019 11
finished 2019 11
starting 2019 12
finished 2019 12
starting 2021 1
finished 2021 01
starting 2021 2
finished 2021 02
starting 2021 3
finished 2021 03
starting 2021 4
finished 2021 04
starting 2021 5
finished 2021 05
starting 2021 6
finished 2021 06
starting 2021 7
finished 2021 07
starting 2021 8
finished 2021 08
starting 2021 9
finished 2021 09
starting 2021 10
finished 2021 10
starting 2021 11
finished 2021 11
starting 2021 12
finished 2021 12


##### 4. Pyspark Basics

#### A new data format: Parquet

- stored in columns
- single data type per column
- compressed much more
- faster to read and run


#### Read in data

In [5]:
path = '../data/'

In [10]:
sdf = spark.read.parquet(path + "2019-01.parquet")
sdf

                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.5,1.0,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.6,1.0,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.0,1.0,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,
2,2018-11-28 16:25:49,2018-11-28 16:28:26,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,5.76,0.3,13.31,,
2,2018-11-28 16:29:37,2018-11-28 16:33:43,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1.0,1.3,1.0,N,163,229,1,6.5,0.5,0.5,1.25,0.0,0.3,9.05,,
1,2019-01-01 00:32:01,2019-01-01 00:45:39,1.0,3.7,1.0,N,229,7,1,13.5,0.5,0.5,3.7,0.0,0.3,18.5,,
1,2019-01-01 00:57:32,2019-01-01 01:09:32,2.0,2.1,1.0,N,141,234,1,10.0,0.5,0.5,1.7,0.0,0.3,13.0,,




In [18]:
sdf.count()

7696617

In [19]:
path + '*.parquet'

'../data/*.parquet'

In [6]:
full_sdf = spark.read.parquet(path + '*.parquet')
full_sdf.limit(5)

                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.5,1.0,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.6,1.0,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.0,1.0,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.0,1.0,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.0,2.0,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,


In [21]:
full_sdf.count()

84598444

In [22]:
import pandas as pd
df = pd.read_parquet(path + '2019-01.parquet')
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.50,1.0,N,151,239,1,7.00,0.50,0.5,1.65,0.00,0.3,9.95,,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.60,1.0,N,239,246,1,14.00,0.50,0.5,1.00,0.00,0.3,16.30,,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.00,1.0,N,236,236,1,4.50,0.50,0.5,0.00,0.00,0.3,5.80,,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.00,1.0,N,193,193,2,3.50,0.50,0.5,0.00,0.00,0.3,7.55,,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.00,2.0,N,193,193,2,52.00,0.00,0.5,0.00,0.00,0.3,55.55,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7696612,2,2019-01-31 23:37:20,2019-02-01 00:10:43,,10.24,,,142,95,0,0.00,2.75,0.0,0.00,5.76,0.3,0.00,,
7696613,2,2019-01-31 23:28:00,2019-01-31 23:50:50,,12.43,,,48,213,0,48.80,5.50,0.0,0.00,0.00,0.3,54.60,,
7696614,2,2019-01-31 23:11:00,2019-01-31 23:46:00,,9.14,,,159,246,0,51.05,2.75,0.5,0.00,0.00,0.3,54.60,,
7696615,2,2019-01-31 23:03:00,2019-01-31 23:14:00,,0.00,,,265,265,0,0.00,0.00,0.5,9.82,0.00,0.3,0.00,,


In [23]:
from pathlib import Path
import pandas as pd

data_dir = Path('../data/')
full_df = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in data_dir.glob('*.parquet')
)
full_df.limit(5)

: 

: 

#### Overview of Data

In [7]:
full_sdf.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [12]:
sdf.select('passenger_count').describe()

                                                                                

summary,passenger_count
count,7667945.0
mean,1.5670317144945614
stddev,1.2244198591042097
min,0.0
max,9.0


                                                                                

23/07/28 23:05:43 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2115354 ms exceeds timeout 120000 ms
23/07/28 23:05:43 WARN SparkContext: Killing executors is not supported by current scheduler.
