<a href="https://colab.research.google.com/github/Richish/spark_on_gpus/blob/main/01_basic_spark_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [19]:
# imports
#! pip install pyspark
import pyspark
import pandas as pd

In [5]:
# connecting to drive
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [18]:
# finding data
! ls /content/drive/MyDrive/colab_notebooks/datasets/taxi-small/trainWithEval

test.csv


In [21]:
train_file = "/content/drive/MyDrive/colab_notebooks/datasets/taxi-small/train/taxi-small.csv"
eval_file = "/content/drive/MyDrive/colab_notebooks/datasets/taxi-small/eval/taxi-small.csv"
test_file = "/content/drive/MyDrive/colab_notebooks/datasets/taxi-small/trainWithEval/test.csv"

In [22]:
# creating pandas dfs
train_df, eval_df, test_df = pd.read_csv(train_file), pd.read_csv(eval_file), pd.read_csv(test_file)
train_df, eval_df, test_df

(      452563162  1   1.74  -73.99114500000002  ...  13  6.0  1.0  2.6339725754785412
 0     452563162  5   2.72          -73.948132  ...  13  6.0  1.0            4.025153
 1     452563162  1   0.94          -73.982477  ...  13  6.0  1.0            1.153509
 2     452563162  1   3.63          -73.977707  ...  13  6.0  1.0            5.170668
 3     452563162  2  11.86          -73.864075  ...  13  6.0  1.0           12.644747
 4     452563162  5   3.03          -73.970228  ...  13  6.0  1.0            3.869515
 ...         ... ..    ...                 ...  ...  ..  ...  ...                 ...
 7994  452563162  1   2.10          -73.977258  ...  13  6.0  1.0            3.053085
 7995  452563162  1   3.33          -73.991152  ...  13  6.0  1.0            5.412012
 7996  452563162  1   4.28          -74.002435  ...  13  6.0  1.0            4.941396
 7997  452563162  1   1.04          -73.983282  ...  13  6.0  1.0            1.488445
 7998  452563162  1   0.63          -73.983223  ...  1

In [23]:
train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7999 entries, 0 to 7998
Data columns (total 17 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   452563162           7999 non-null   int64  
 1   1                   7999 non-null   int64  
 2   1.74                7999 non-null   float64
 3   -73.99114500000002  7999 non-null   float64
 4   40.76553700000001   7999 non-null   float64
 5   -677418915          7999 non-null   int64  
 6   -1                  7999 non-null   int64  
 7   -73.97599199999999  7999 non-null   float64
 8   40.786277000000005  7999 non-null   float64
 9   7.5                 7999 non-null   float64
 10  10                  7999 non-null   int64  
 11  2012                7999 non-null   int64  
 12  11                  7999 non-null   int64  
 13  13                  7999 non-null   int64  
 14  6.0                 7999 non-null   float64
 15  1.0                 7999 non-null   float64
 16  2.6339

In [28]:
# getting to spark
# creating spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

spark = SparkSession.builder.appName("Taxi").getOrCreate()

In [29]:
train_df = spark.read.csv(train_file)

In [27]:
train_df.head()

Row(_c0='452563162', _c1='1', _c2='1.74', _c3='-73.99114500000002', _c4='40.76553700000001', _c5='-677418915', _c6='-1', _c7='-73.97599199999999', _c8='40.786277000000005', _c9='7.5', _c10='10', _c11='2012', _c12='11', _c13='13', _c14='6.0', _c15='1.0', _c16='2.6339725754785412')

In [41]:
# creating schema for the dataframe
from pyspark.sql.types import StructType, StructField, StructField, DoubleType, IntegerType
schema = StructType([
StructField("vendor_id", DoubleType()),
StructField("passenger_count", DoubleType()),
StructField("trip_distance", DoubleType()),
StructField("pickup_longitude", DoubleType()),
StructField("pickup_latitude", DoubleType()),
StructField("rate_code", DoubleType()),
StructField("store_and_fwd", DoubleType()),
StructField("dropoff_longitude", DoubleType()),
StructField("dropoff_latitude", DoubleType()),
StructField("fare_amount", DoubleType()),
StructField("hour", DoubleType()),
StructField("year", IntegerType()),
StructField("month", IntegerType()),
StructField("day", DoubleType()),
StructField("day_of_week", DoubleType()),
StructField("is_weekend", DoubleType()) ])

In [43]:
# putting schema to the dataframe
train_df = spark.read.option("infer_schema", False).option("header", True).schema(schema).csv(train_file)
train_df.head()

Row(vendor_id=452563162.0, passenger_count=5.0, trip_distance=2.72, pickup_longitude=-73.948132, pickup_latitude=40.829826999999995, rate_code=-677418915.0, store_and_fwd=-1.0, dropoff_longitude=-73.969648, dropoff_latitude=40.797472000000006, fare_amount=11.5, hour=10.0, year=2012, month=11, day=13.0, day_of_week=6.0, is_weekend=1.0)

In [45]:
train_df.show(1)

+------------+---------------+-------------+----------------+------------------+-------------+-------------+-----------------+------------------+-----------+----+----+-----+----+-----------+----------+
|   vendor_id|passenger_count|trip_distance|pickup_longitude|   pickup_latitude|    rate_code|store_and_fwd|dropoff_longitude|  dropoff_latitude|fare_amount|hour|year|month| day|day_of_week|is_weekend|
+------------+---------------+-------------+----------------+------------------+-------------+-------------+-----------------+------------------+-----------+----+----+-----+----+-----------+----------+
|4.52563162E8|            5.0|         2.72|      -73.948132|40.829826999999995|-6.77418915E8|         -1.0|       -73.969648|40.797472000000006|       11.5|10.0|2012|   11|13.0|        6.0|       1.0|
+------------+---------------+-------------+----------------+------------------+-------------+-------------+-----------------+------------------+-----------+----+----+-----+----+-----------+--

In [53]:
# select join groupby
train_df.groupby("hour").count().sort("hour").show()

+----+-----+
|hour|count|
+----+-----+
| 0.0|   12|
| 1.0|   49|
| 2.0|  658|
| 3.0|  742|
| 4.0|  812|
| 5.0|   89|
| 6.0|  464|
| 7.0|  678|
| 8.0|  364|
| 9.0| 1055|
|10.0| 1303|
|11.0| 1422|
|12.0|  321|
|13.0|    3|
|14.0|    3|
|16.0|    4|
|17.0|    2|
|18.0|    5|
|19.0|    4|
|21.0|    4|
+----+-----+
only showing top 20 rows



In [58]:
# filters
train_df.select("hour", "fare_amount").filter("hour==0").show(5)

+----+-----------+
|hour|fare_amount|
+----+-----------+
| 0.0|       10.5|
| 0.0|       12.5|
| 0.0|       11.0|
| 0.0|        7.5|
| 0.0|       18.5|
+----+-----------+
only showing top 5 rows

