# Introduction to PySpark

According to the documentation, "PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python.". So, basically, PySpark is a way to read and work with big data. Most part of what you can do with pandas, for instance, you will be able to do with PySpark, but faster if we are talking about big data.

When should you use PySpark instead of Pandas? Basically, when we are dealing with Big Data PySpark will be a better option than pandas. PySpark do not replace pandas or polars.

### If the notebook was usefull in any way, please vote up!

### Function in the notebook

In this notebook we will work with the following functions and technics:

- read.csv
- printSchema
- select
- describe
- check NA
- rename column
- filter
- withColumn, using When-Otherwise
- group by - agg

- Extra: dealing with date

In [None]:
# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Create/Start PySpark session

In [None]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('PySpark_Introdution')
    .getOrCreate() #If there were another prev session, It would start it again
)



### Creating dataframe and visualizing

In [None]:
df = spark.read.csv('/kaggle/input/taxi-trips-chicago-2024/Taxi_Trips_-_2024_20240408.csv', 
                    header=True, 
                    inferSchema=True, #Will infer the variable types
                    sep = ",")#,
                    #nullValue = "NA") #Define what will by called NA. NA is printed as NULL as you can see below.


### Printing the column's type

In [None]:
df.printSchema()

### Select() function

In [None]:
df.select('Trip ID','Trip Start Timestamp','Trip End Timestamp','Trip Miles','Fare','Tips','Pickup Census Tract','Payment Type').show(truncate=False)

In [None]:
df.select('Trip ID','Trip Start Timestamp').show(5)

In [None]:
# Change name of the column for the visualization 
df.select(col('Fare').alias('Payment')).show(5)

### Checking some basic statistics

In [None]:
df.select('Fare', 'Tips').describe().show()

In [None]:
# Implementation with round
df.describe()\
  .select('summary', *[round(c, 2).alias(c) for c in ['Fare', 'Tips']])\
  .show()

### Checking for NAs

In [None]:
for coluna in df.columns[0:5]:
    print(coluna, df.filter(df[coluna].isNull()).count())

### Column rename

In [None]:
df = df.withColumnRenamed('Fare', 'Payment')

df.select('Payment').show(5)

### Columns rename

In [None]:
df = df.withColumnsRenamed({'Payment':'Fare', 'Tips':'TipsDriver'})

df.select('Fare', 'TipsDriver').show(5)

### Filter() function

In [None]:
df.filter('TipsDriver >= 20').select('TipsDriver').show(10)

In [None]:
df.filter(col("Payment Type") == "Cash").select('Fare', 'TipsDriver', 'Payment Type').show(5)

In [None]:
df.filter((col("Payment Type") == "Cash") & (col('TipsDriver') >= 20)).select('Payment Type','TipsDriver').show()

In [None]:
df.filter(col('TipsDriver').isNull()).select('Payment Type','TipsDriver').show()

### Creating column

#### Using When-otherwise

In [None]:
df = df.withColumn('Trip Start Day', 
                   when(col('Trip Start Timestamp').contains('AM'),1).otherwise(0))\
.withColumn('Trip End Day', 
            when(col('Trip End Timestamp').contains('AM'),1).otherwise(0))

df.select('Trip Start Timestamp', 'Trip Start Day', 'Trip End Timestamp', 'Trip End Day').show(truncate=False)

### Group by - Agg

In [None]:
df.printSchema()

In [None]:
df.groupBy('Payment Type').agg({'Fare':'avg'}).orderBy('avg(Fare)', ascending=False).show()

In [None]:
df.groupBy('Payment Type').agg(round(mean('Fare'), 2).alias('Mean'),
                                     max('Fare').alias('Max'),
                                     min('Fare').alias('Min'),
                               round(sum('Fare'), 2).alias('Sum'),
                                     count('Fare').alias('Count')).orderBy('Mean', ascending=False).show()

### Dealing with date and timestamp

In [None]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# Setting the Timestamp variables as TimestampType
df = df.withColumn('Trip Start Timestamp', to_timestamp('Trip Start Timestamp', 'MM/dd/yyyy HH:mm:ss'))\
.withColumn('Trip End Timestamp', to_timestamp('Trip End Timestamp', 'MM/dd/yyyy HH:mm:ss'))

df.printSchema()