In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [2]:
df = spark.read \
    .option('header', 'true') \
    .parquet('fhv_tripdata_2021-02.parquet')

In [3]:
df.registerTempTable('fhv')



In [7]:
# How many taxi trips were there on February 15?
query = \
    """
    select count(*) 
    from fhv
    where to_date(pickup_datetime) = '2021-02-15'
    """

spark.sql(query).show()

+--------+
|count(1)|
+--------+
|   35709|
+--------+



In [9]:
# Find the longest trip for each day ?
query = \
    """
    with duration_cte as (
        select
            to_date(pickup_datetime) as date,
            ((bigint(to_timestamp(dropOff_datetime))) - (bigint(to_timestamp(pickup_datetime))))/60 as durations
        from fhv
    )
    select date, max(durations) as max_duration_minutes 
    from duration_cte
    group by 1
    order by 2 desc
    """

spark.sql(query).show()

+----------+--------------------+
|      date|max_duration_minutes|
+----------+--------------------+
|2021-02-05|            110919.0|
|2021-02-01|             46290.0|
|2021-02-25|             40489.0|
|2021-02-23|             40352.0|
|2021-02-04|   40034.88333333333|
|2021-02-27|             17084.0|
|2021-02-28|             15763.0|
|2021-02-15|            14670.15|
|2021-02-22|  13001.533333333333|
|2021-02-08|   9424.916666666666|
|2021-02-19|             9012.15|
|2021-02-13|   8422.683333333332|
|2021-02-16|              4816.1|
|2021-02-12|              4344.0|
|2021-02-17|   4284.783333333334|
|2021-02-11|  3219.8166666666666|
|2021-02-24|   2767.733333333333|
|2021-02-06|   2752.633333333333|
|2021-02-18|  2749.0333333333333|
|2021-02-20|  2701.4666666666667|
+----------+--------------------+
only showing top 20 rows



In [20]:
# Find Top 5 Most frequent `dispatching_base_num` ?
query = \
    """
    select dispatching_base_num
    , count(*) as count
    from fhv
    group by 1
    order by 2 desc
    limit 5
    """

spark.sql(query).show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



In [24]:
# Find Top 5 Most common location pairs (PUlocationID and DOlocationID)
query = \
    """
    select 
        PUlocationID
        , DOlocationID
        , count(*) as count
    from fhv
    where PUlocationID is not null and DOlocationID is not null
    group by 1,2
    order by 3 desc
    limit 5
    """

spark.sql(query).show()

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+



### Importing to Bigquery and GCS

In [29]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [33]:
credentials_location = r'C:\Users\Billie\.google\credentials\google_credentials.json'
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [34]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [35]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [36]:
df = spark.read \
    .option('header', 'true') \
    .parquet('fhv_tripdata_2021-02.parquet')
df.createOrReplaceTempView('fhv')

In [None]:
# How many taxi trips were there on February 15?
query = \
    """
    select count(distinct dispatching_base_num) from fhv
    """

task_1 = spark.sql(query)
task_1.show()
task_1.printSchema()

task_1.write.format('bigquery') \
  .option('table', 'homework_modul6.task_1') \
  .save()

In [None]:
# Find the longest trip for each day ?
query = \
    """
    with duration_cte as (
        select
            to_date(pickup_datetime) as date,
            ((bigint(to_timestamp(dropOff_datetime))) - (bigint(to_timestamp(pickup_datetime))))/60 as durations
        from fhv
    )
    select date, max(durations) as max_duration_minutes 
    from duration_cte
    group by 1
    order by 1
    """

task_2 = spark.sql(query)
task_2.show()
task_2.printSchema()

task_2.write.format('bigquery') \
  .option('table', 'homework_modul6.task_2') \
  .save()

In [None]:
# Find Top 5 Most frequent `dispatching_base_num` ?
query = \
    """
    select dispatching_base_num
    , count(*) as count
    from fhv
    group by 1
    order by 2 desc
    limit 5
    """

task_3 = spark.sql(query)
task_3.show()
task_3.printSchema()

task_3.write.format('bigquery') \
  .option('table', 'homework_modul6.task_3') \
  .save()

In [None]:
# Find Top 5 Most common location pairs (PUlocationID and DOlocationID)
query = \
    """
    select 
        PUlocationID
        , DOlocationID
        , count(*) as count
    from fhv
    where PUlocationID is not null and DOlocationID is not null
    group by 1,2
    order by 3 desc
    limit 5
    """

task_4 = spark.sql(query)
task_4.show()
task_4.printSchema()

task_4.write.format('bigquery') \
  .option('table', 'homework_modul6.task_4') \
  .save()