### Import used library 
To build the Spark Session, we need to import the library and load the pyspark function

In [1]:
import os
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [2]:
spark = (
    SparkSession\
    .builder\
    .master('local[*]')\
    .appName('my_spark_app')\
    .enableHiveSupport()\
    .getOrCreate())

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/gilangrama13/hadoop/spark3-hadoop3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/gilangrama13/hadoop/hadoop/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/gilangrama13/hadoop/hive/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


2022-09-29T11:40:53,183 WARN [main] org.apache.spark.util.Utils - Your hostname, LAPTOP-T7AK0MN7 resolves to a loopback address: 127.0.1.1; using 172.29.178.233 instead (on interface eth0)
2022-09-29T11:40:53,188 WARN [main] org.apache.spark.util.Utils - Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2022-09-29T11:41:04,345 WARN [Thread-4] org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.sql("show databases").show()

2022-09-29T11:43:58,580 INFO [Thread-4] org.apache.hadoop.hive.conf.HiveConf - Found configuration file file:/home/gilangrama13/hadoop/hive/conf/hive-site.xml


Hive Session ID = eec526fb-72d3-4934-9e94-78c23eada66f


2022-09-29T11:43:58,803 INFO [Thread-4] SessionState - Hive Session ID = eec526fb-72d3-4934-9e94-78c23eada66f
2022-09-29T11:43:59,008 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Trying to connect to metastore with URI thrift://localhost:9083
2022-09-29T11:43:59,043 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Opened a connection to metastore, current connections: 1
2022-09-29T11:43:59,181 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Connected to metastore.
2022-09-29T11:43:59,181 INFO [Thread-4] org.apache.hadoop.hive.metastore.RetryingMetaStoreClient - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=gilangrama13 (auth:SIMPLE) retries=1 delay=1 lifetime=0
+-----------------+
|        namespace|
+-----------------+
|          default|
|user_warehouse_db|
+-----------------+



### Load data
By default, spark can load data from **HDFS**, if you want to load from local data you have to add `file://` as prefix.

In [5]:
def loadLocalData(dataName):
    current_dir = os.getcwd()
    datapath = "file://{}/../data/{}".format(current_dir, dataName)
    return spark.read.csv(datapath, header=True, sep=",")

In [6]:
cycle_stations = loadLocalData("cycle_stations.csv")
cycle_hire = loadLocalData("cycle_hire.csv")

                                                                                

### Explore Data
The cell below are some example of data exploration using __PySpark__

In [22]:
cycle_stations.printSchema()

root
 |-- id: string (nullable = true)
 |-- installed: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- locked: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- bikes_count: string (nullable = true)
 |-- docks_count: string (nullable = true)
 |-- nbEmptyDocks: string (nullable = true)
 |-- temporary: string (nullable = true)
 |-- terminal_name: string (nullable = true)
 |-- install_date: string (nullable = true)
 |-- removal_date: string (nullable = true)



In [23]:
cycle_stations.count()

785

In [25]:
cycle_hire.printSchema()

root
 |-- rental_id: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- bike_id: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- end_station_logical_terminal: string (nullable = true)
 |-- start_station_logical_terminal: string (nullable = true)
 |-- end_station_priority_id: string (nullable = true)



In [24]:
cycle_hire.count()

20000

In [54]:
cycle_stations.show(10)

+---+---------+-----------+------+------------+--------------------+-----------+-----------+------------+---------+-------------+------------+------------+
| id|installed|   latitude|locked|   longitude|                name|bikes_count|docks_count|nbEmptyDocks|temporary|terminal_name|install_date|removal_date|
+---+---------+-----------+------+------------+--------------------+-----------+-----------+------------+---------+-------------+------------+------------+
|789|     true|  51.538718| false|-0.011889482|Podium, Queen Eli...|         40|         40|           0|    false|       300021|  2016-01-11|        null|
|819|     true|   51.50621| false|   -0.114842|Belvedere Road 2,...|         38|         38|           0|    false|       300246|  2016-10-13|        null|
|334|     true|  51.505044| false|   -0.115851|Concert Hall Appr...|         10|         10|           0|    false|         2635|  2010-07-26|        null|
|171|     true| 51.4916156| false|-0.186753859|Collingham Garden

In [28]:
cycle_hire.show(10)

+---------+--------+-------+--------------------+--------------+--------------------+--------------------+----------------+--------------------+----------------------------+------------------------------+-----------------------+
|rental_id|duration|bike_id|            end_date|end_station_id|    end_station_name|          start_date|start_station_id|  start_station_name|end_station_logical_terminal|start_station_logical_terminal|end_station_priority_id|
+---------+--------+-------+--------------------+--------------+--------------------+--------------------+----------------+--------------------+----------------------------+------------------------------+-----------------------+
| 57920939|     300|   2342|2016-09-02T08:03:00Z|          null|Brushfield Street...|2016-09-02T07:58:00Z|            null|Selby Street, Whi...|                        1161|                        200135|                      1|
| 58006930|    2100|   4105|2016-09-05T00:25:00Z|          null|Horseferry Road, ...

In [55]:
cycle_hire.groupBy("start_station_name", "end_station_name")\
    .agg(avg("duration").alias("avg_duration"), count(lit(1)).alias("count"))\
    .where("count > 20")\
    .orderBy(["count"], ascending =[0])\
    .show(50, truncate=False)

+-------------------------------------------+-----------------------------------------+------------------+-----+
|start_station_name                         |end_station_name                         |avg_duration      |count|
+-------------------------------------------+-----------------------------------------+------------------+-----+
|Fulham Broadway, Walham Green              |Sandilands Road, Walham Green            |321.8181818181818 |99   |
|Erin Close, Walham Green                   |Sandilands Road, Walham Green            |276.27906976744185|86   |
|Kensington Olympia Station, Olympia        |Phillimore Gardens, Kensington           |342.2950819672131 |61   |
|York Hall, Bethnal Green                   |Pritchard's Road, Bethnal Green          |370.0             |60   |
|Shoreditch High Street, Shoreditch         |Pritchard's Road, Bethnal Green          |420.0             |58   |
|Hoxton Station, Hoxton                     |Pritchard's Road, Bethnal Green          |377.89473

In [70]:
cycle_hire.alias("hires").join(cycle_stations.alias("stationsA"), col("hires.start_station_name") == col("stationsA.name"), "inner")\
    .join(cycle_stations.alias("stationsB"), col("hires.end_station_name") == col("stationsB.name"), "inner")\
    .select(col("hires.rental_id"), col("hires.start_station_name"), col("stationsA.id").alias("start_station_id"), 
    col("hires.end_station_name"), col("stationsB.id").alias("end_station_id"),
    col("hires.duration"), col("hires.start_date"), col("hires.end_date"))\
    .show()

+---------+--------------------+----------------+--------------------+--------------+--------+--------------------+--------------------+
|rental_id|  start_station_name|start_station_id|    end_station_name|end_station_id|duration|          start_date|            end_date|
+---------+--------------------+----------------+--------------------+--------------+--------+--------------------+--------------------+
| 57920939|Selby Street, Whi...|             565|Brushfield Street...|           251|     300|2016-09-02T07:58:00Z|2016-09-02T08:03:00Z|
| 58006930|Tavistock Street,...|             335|Horseferry Road, ...|           221|    2100|2016-09-04T23:50:00Z|2016-09-05T00:25:00Z|
| 58072259|Pott Street, Beth...|             479|Wellington Row, B...|           533|     420|2016-09-06T20:10:00Z|2016-09-06T20:17:00Z|
| 58002916|Ashley Place, Vic...|             177|Somerset House, S...|           564|     780|2016-09-04T19:44:00Z|2016-09-04T19:57:00Z|
| 58040725|Belgrove Street ,...|         

In [8]:
cycle_hire.alias("hires").join(cycle_stations.alias("stationsA"), col("hires.start_station_name") == col("stationsA.name"), "inner")\
    .join(cycle_stations.alias("stationsB"), col("hires.end_station_name") == col("stationsB.name"), "inner")\
    .select(col("hires.rental_id"), col("hires.start_station_name"), col("stationsA.id").alias("start_station_id"), 
    col("hires.end_station_name"), col("stationsB.id").alias("end_station_id"),
    col("hires.duration"), col("hires.start_date"), col("hires.end_date"))\
    .write.mode("overwrite")\
    .saveAsTable("user_warehouse_db.cycle_used")

                                                                                

2022-09-29T11:44:43,049 INFO [Thread-4] org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAccessController - Created SQLStdHiveAccessController for session context : HiveAuthzSessionContext [sessionString=eec526fb-72d3-4934-9e94-78c23eada66f, clientType=HIVECLI]
2022-09-29T11:44:43,053 WARN [Thread-4] org.apache.hadoop.hive.ql.session.SessionState - METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2022-09-29T11:44:43,053 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Mestastore configuration metastore.filter.hook changed from org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl to org.apache.hadoop.hive.ql.security.authorization.plugin.AuthorizationMetaStoreFilterHook
2022-09-29T11:44:43,059 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Closed a connection to metastore, current connections: 0
2022-09-29T11:44:43,060 INFO [Thread

In [10]:
spark.sql("select count(*) from user_warehouse_db.cycle_used").show()

2022-09-29T13:44:53,864 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Trying to connect to metastore with URI thrift://localhost:9083
2022-09-29T13:44:53,865 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Opened a connection to metastore, current connections: 2
2022-09-29T13:44:53,991 INFO [Thread-4] org.apache.hadoop.hive.metastore.HiveMetaStoreClient - Connected to metastore.
2022-09-29T13:44:53,991 INFO [Thread-4] org.apache.hadoop.hive.metastore.RetryingMetaStoreClient - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=gilangrama13 (auth:SIMPLE) retries=1 delay=1 lifetime=0




+--------+
|count(1)|
+--------+
|   17850|
+--------+



                                                                                

In [74]:
spark.sql("select * from user_warehouse_db.cycle_used").show()

+---------+--------------------+----------------+--------------------+--------------+--------+--------------------+--------------------+
|rental_id|  start_station_name|start_station_id|    end_station_name|end_station_id|duration|          start_date|            end_date|
+---------+--------------------+----------------+--------------------+--------------+--------+--------------------+--------------------+
| 57920939|Selby Street, Whi...|             565|Brushfield Street...|           251|     300|2016-09-02T07:58:00Z|2016-09-02T08:03:00Z|
| 58006930|Tavistock Street,...|             335|Horseferry Road, ...|           221|    2100|2016-09-04T23:50:00Z|2016-09-05T00:25:00Z|
| 58072259|Pott Street, Beth...|             479|Wellington Row, B...|           533|     420|2016-09-06T20:10:00Z|2016-09-06T20:17:00Z|
| 58002916|Ashley Place, Vic...|             177|Somerset House, S...|           564|     780|2016-09-04T19:44:00Z|2016-09-04T19:57:00Z|
| 58040725|Belgrove Street ,...|         

In [10]:
spark.stop()