In [33]:
############
#spark.catalog.clearCache()

### --------------------------------------------------

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Jupyter with Spark") \
    .master("spark://master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.262.jar") \
    .getOrCreate()

In [3]:
spark

In [4]:
df = spark.read.option("header", "true").parquet("s3a://mybucket/taxi/input/NYC-Taxi.parquet")

In [5]:
df.count()

2964624

In [14]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [17]:
long_trip = df.filter(df.trip_distance > 10)

In [21]:
long_trip.count()

228254

In [20]:
df.describe(["trip_distance", "fare_amount", "total_amount"]).show()

+-------+------------------+------------------+------------------+
|summary|     trip_distance|       fare_amount|      total_amount|
+-------+------------------+------------------+------------------+
|  count|           2964624|           2964624|           2964624|
|   mean|3.6521691789580624|18.175061916792536| 26.80150477092493|
| stddev|225.46257238220082|18.949547705905324|23.385577429672043|
|    min|               0.0|            -899.0|            -900.0|
|    max|          312722.3|            5000.0|            5000.0|
+-------+------------------+------------------+------------------+



In [32]:
df.select('payment_type').distinct().show()

+------------+
|payment_type|
+------------+
|           1|
|           3|
|           2|
|           4|
|           0|
+------------+



In [33]:
df.groupBy('payment_type').count().show()

+------------+-------+
|payment_type|  count|
+------------+-------+
|           1|2319046|
|           3|  19597|
|           2| 439191|
|           4|  46628|
|           0| 140162|
+------------+-------+



In [35]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [39]:
df.select(min('payment_type')).show()

+-----------------+
|min(payment_type)|
+-----------------+
|                0|
+-----------------+



In [6]:
result = df.groupBy("passenger_count").count()
result.show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              0|  31465|
|              7|      8|
|              6|  22353|
|              5|  33506|
|              1|2188739|
|              3|  91262|
|              8|     51|
|              2| 405103|
|              4|  51974|
|              9|      1|
|           NULL| 140162|
+---------------+-------+



In [15]:
spark.stop()