In [382]:
import os
import pandas as pd

# setting the spark_home variable
os.environ["SPARK_HOME"] = "C:\spark\spark-3.1.2-bin-hadoop2.7"

In [383]:
import findspark
findspark.init()

In [384]:
# creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Starting a new Spark Session...") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [385]:
# Testing spark session
spark

In [386]:
# reading the csv source file
path = 'trips.csv'
trips = spark.read.csv(path, sep=',', inferSchema=True)
trips.count()

101

In [387]:
# checking amount of records
trips.limit(trips.count()).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4
0,region,origin_coord,destination_coord,datetime,datasource
1,Prague,POINT (14.4973794438195 50.00136875782316),POINT (14.43109483523328 50.04052930943246),2018-05-28 09:03:40,funny_car
2,Turin,POINT (7.672837913286881 44.9957109242058),POINT (7.720368637535126 45.06782385393849),2018-05-21 02:54:04,baba_car
3,Prague,POINT (14.32427345662177 50.00002074358429),POINT (14.47767895969969 50.09339790740321),2018-05-13 08:52:25,cheap_mobile
4,Turin,POINT (7.541509189114433 45.09160503827746),POINT (7.74528653441973 45.02628598341506),2018-05-06 09:49:16,bad_diesel_vehicles
...,...,...,...,...,...
96,Prague,POINT (14.33562319852013 50.05977285737693),POINT (14.45302412886982 50.06961029075634),2018-05-03 18:56:45,cheap_mobile
97,Hamburg,POINT (9.996714798980491 53.52203690589671),POINT (10.17431393081631 53.51796499041119),2018-05-23 12:43:17,baba_car
98,Prague,POINT (14.40975521275597 50.037791514028),POINT (14.59895464921585 50.05472087955579),2018-05-12 08:13:09,cheap_mobile
99,Hamburg,POINT (10.08338857045871 53.59661344302611),POINT (10.17914017806172 53.60909301795856),2018-05-22 10:39:49,baba_car


In [388]:
# renaming the columns name according to csv header
tripsColName = ['region','origin_coord','destination_coord','datetime', 'datasource']

for index, colName in enumerate(tripsColName):
    trips = trips.withColumnRenamed(f"_c{index}", colName)

trips.columns

['region', 'origin_coord', 'destination_coord', 'datetime', 'datasource']

In [389]:
# checking the new column names
trips.printSchema()

root
 |-- region: string (nullable = true)
 |-- origin_coord: string (nullable = true)
 |-- destination_coord: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- datasource: string (nullable = true)



In [390]:
# converting string to datetime for column datetme
trips = trips.withColumn('datetime', trips['datetime'].cast('timestamp'))
trips.printSchema()

root
 |-- region: string (nullable = true)
 |-- origin_coord: string (nullable = true)
 |-- destination_coord: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- datasource: string (nullable = true)



In [391]:
# checking amount of records
trips.limit(trips.count()).toPandas()

Unnamed: 0,region,origin_coord,destination_coord,datetime,datasource
0,region,origin_coord,destination_coord,NaT,datasource
1,Prague,POINT (14.4973794438195 50.00136875782316),POINT (14.43109483523328 50.04052930943246),2018-05-28 09:03:40,funny_car
2,Turin,POINT (7.672837913286881 44.9957109242058),POINT (7.720368637535126 45.06782385393849),2018-05-21 02:54:04,baba_car
3,Prague,POINT (14.32427345662177 50.00002074358429),POINT (14.47767895969969 50.09339790740321),2018-05-13 08:52:25,cheap_mobile
4,Turin,POINT (7.541509189114433 45.09160503827746),POINT (7.74528653441973 45.02628598341506),2018-05-06 09:49:16,bad_diesel_vehicles
...,...,...,...,...,...
96,Prague,POINT (14.33562319852013 50.05977285737693),POINT (14.45302412886982 50.06961029075634),2018-05-03 18:56:45,cheap_mobile
97,Hamburg,POINT (9.996714798980491 53.52203690589671),POINT (10.17431393081631 53.51796499041119),2018-05-23 12:43:17,baba_car
98,Prague,POINT (14.40975521275597 50.037791514028),POINT (14.59895464921585 50.05472087955579),2018-05-12 08:13:09,cheap_mobile
99,Hamburg,POINT (10.08338857045871 53.59661344302611),POINT (10.17914017806172 53.60909301795856),2018-05-22 10:39:49,baba_car


In [392]:
# adding a new column: week number of year to differ each week
from pyspark.sql.functions import weekofyear

trips = trips.withColumn('week_of_year',weekofyear(trips.datetime))
trips.show()

+-------+--------------------+--------------------+-------------------+-------------------+------------+
| region|        origin_coord|   destination_coord|           datetime|         datasource|week_of_year|
+-------+--------------------+--------------------+-------------------+-------------------+------------+
| region|        origin_coord|   destination_coord|               null|         datasource|        null|
| Prague|POINT (14.4973794...|POINT (14.4310948...|2018-05-28 09:03:40|          funny_car|          22|
|  Turin|POINT (7.67283791...|POINT (7.72036863...|2018-05-21 02:54:04|           baba_car|          21|
| Prague|POINT (14.3242734...|POINT (14.4776789...|2018-05-13 08:52:25|       cheap_mobile|          19|
|  Turin|POINT (7.54150918...|POINT (7.74528653...|2018-05-06 09:49:16|bad_diesel_vehicles|          18|
|  Turin|POINT (7.61407811...|POINT (7.52749714...|2018-05-23 12:45:54|      pt_search_app|          21|
|Hamburg|POINT (10.0729902...|POINT (9.78919760...|2018

In [393]:
# eliminating first column if it is a header
trips = trips.filter( (trips.region!='region') & (trips.origin_coord!='origin_coord') & (trips.destination_coord!='destination_coord'))
trips = trips.withColumn("date_only", to_date(col("DateTime")))
trips.limit(trips.count()).toPandas()

Unnamed: 0,region,origin_coord,destination_coord,datetime,datasource,week_of_year,date_only
0,Prague,POINT (14.4973794438195 50.00136875782316),POINT (14.43109483523328 50.04052930943246),2018-05-28 09:03:40,funny_car,22,2018-05-28
1,Turin,POINT (7.672837913286881 44.9957109242058),POINT (7.720368637535126 45.06782385393849),2018-05-21 02:54:04,baba_car,21,2018-05-21
2,Prague,POINT (14.32427345662177 50.00002074358429),POINT (14.47767895969969 50.09339790740321),2018-05-13 08:52:25,cheap_mobile,19,2018-05-13
3,Turin,POINT (7.541509189114433 45.09160503827746),POINT (7.74528653441973 45.02628598341506),2018-05-06 09:49:16,bad_diesel_vehicles,18,2018-05-06
4,Turin,POINT (7.614078119815749 45.13433106465422),POINT (7.527497142312585 45.03335051325654),2018-05-23 12:45:54,pt_search_app,21,2018-05-23
...,...,...,...,...,...,...,...
95,Prague,POINT (14.33562319852013 50.05977285737693),POINT (14.45302412886982 50.06961029075634),2018-05-03 18:56:45,cheap_mobile,18,2018-05-03
96,Hamburg,POINT (9.996714798980491 53.52203690589671),POINT (10.17431393081631 53.51796499041119),2018-05-23 12:43:17,baba_car,21,2018-05-23
97,Prague,POINT (14.40975521275597 50.037791514028),POINT (14.59895464921585 50.05472087955579),2018-05-12 08:13:09,cheap_mobile,19,2018-05-12
98,Hamburg,POINT (10.08338857045871 53.59661344302611),POINT (10.17914017806172 53.60909301795856),2018-05-22 10:39:49,baba_car,21,2018-05-22


In [394]:
trips.printSchema()

root
 |-- region: string (nullable = true)
 |-- origin_coord: string (nullable = true)
 |-- destination_coord: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- datasource: string (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- date_only: date (nullable = true)



In [395]:
# counting of records grouped by origin+destinatio+same date
trips.groupBy( "origin_coord","destination_coord","date_only").count().show()

+--------------------+--------------------+----------+-----+
|        origin_coord|   destination_coord| date_only|count|
+--------------------+--------------------+----------+-----+
|POINT (7.72380775...|POINT (7.59287452...|2018-05-24|    1|
|POINT (10.2154964...|POINT (10.2054465...|2018-05-18|    1|
|POINT (14.6429258...|POINT (14.5017970...|2018-05-17|    1|
|POINT (14.3310720...|POINT (14.3200468...|2018-05-20|    1|
|POINT (14.3514619...|POINT (14.3588281...|2018-05-21|    1|
|POINT (7.62504114...|POINT (7.73895376...|2018-05-26|    1|
|POINT (10.2004416...|POINT (9.85751885...|2018-05-05|    1|
|POINT (14.4251434...|POINT (14.4058508...|2018-05-02|    1|
|POINT (14.4240604...|POINT (14.3165732...|2018-05-29|    1|
|POINT (10.1541092...|POINT (10.1041201...|2018-05-27|    1|
|POINT (7.61407811...|POINT (7.52749714...|2018-05-23|    1|
|POINT (14.5272324...|POINT (14.6653023...|2018-05-04|    1|
|POINT (9.80304883...|POINT (9.83488382...|2018-05-09|    1|
|POINT (7.73282848...|PO

In [396]:
# summarizing all trips to the same destinatio_coord
trips.groupBy( "week_of_year","destination_coord").count().show()

+------------+--------------------+-----+
|week_of_year|   destination_coord|count|
+------------+--------------------+-----+
|          20|POINT (14.3200468...|    1|
|          18|POINT (14.4409786...|    1|
|          21|POINT (14.6610239...|    1|
|          21|POINT (7.69874235...|    1|
|          19|POINT (14.6054480...|    1|
|          18|POINT (7.58356869...|    1|
|          20|POINT (7.55848129...|    1|
|          20|POINT (14.3479060...|    1|
|          19|POINT (7.63957800...|    1|
|          18|POINT (10.0588964...|    1|
|          22|POINT (7.70032410...|    1|
|          20|POINT (9.83414766...|    1|
|          18|POINT (14.5492671...|    1|
|          21|POINT (7.66250749...|    1|
|          20|POINT (9.78919760...|    1|
|          21|POINT (7.57970527...|    1|
|          20|POINT (14.6564849...|    1|
|          21|POINT (10.1791401...|    1|
|          19|POINT (7.51712993...|    1|
|          20|POINT (14.5576248...|    1|
+------------+--------------------

In [397]:
# summarizing all trips to the same region
trips.groupBy( "week_of_year","region").count().show()

+------------+-------+-----+
|week_of_year| region|count|
+------------+-------+-----+
|          22| Prague|    3|
|          20|Hamburg|    5|
|          19| Prague|    8|
|          21|Hamburg|    7|
|          19|  Turin|    6|
|          21| Prague|    5|
|          22|  Turin|    4|
|          20|  Turin|    6|
|          18| Prague|   10|
|          22|Hamburg|    5|
|          18|Hamburg|    5|
|          19|Hamburg|    6|
|          18|  Turin|    8|
|          21|  Turin|   14|
|          20| Prague|    8|
+------------+-------+-----+



In [398]:
# opening a SQL Session
%load_ext sql

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


In [399]:
from sqlalchemy import create_engine

In [400]:
# Connecting to the Database
%sql postgresql://postgres:Admin01@localhost:5432/postgres

In [401]:
%%sql
CREATE TABLE if not exists public.trips (
    region   varchar(30),
    origin_coord varchar(100),
    destination_coord varchar(100),
    datetime timestamp,
    datasource varchar(30)
);

select * from public.trips;



 * postgresql://postgres:***@localhost:5432/postgres
Done.
100 rows affected.


region,origin_coord,destination_coord,datetime,datasource
Prague,POINT (14.4973794438195 50.00136875782316),POINT (14.43109483523328 50.04052930943246),2018-05-28 09:03:40,funny_car
Turin,POINT (7.672837913286881 44.9957109242058),POINT (7.720368637535126 45.06782385393849),2018-05-21 02:54:04,baba_car
Prague,POINT (14.32427345662177 50.00002074358429),POINT (14.47767895969969 50.09339790740321),2018-05-13 08:52:25,cheap_mobile
Turin,POINT (7.541509189114433 45.09160503827746),POINT (7.74528653441973 45.02628598341506),2018-05-06 09:49:16,bad_diesel_vehicles
Turin,POINT (7.614078119815749 45.13433106465422),POINT (7.527497142312585 45.03335051325654),2018-05-23 12:45:54,pt_search_app
Hamburg,POINT (10.07299025213017 53.62044974829032),POINT (9.789197601249002 53.46315765148751),2018-05-15 09:13:36,bad_diesel_vehicles
Hamburg,POINT (9.910278201788232 53.58386264717827),POINT (10.02557919725378 53.4120717767391),2018-05-13 13:09:19,funny_car
Turin,POINT (7.560785081962462 45.01901608530191),POINT (7.583568695710608 45.10526898076209),2018-05-06 00:00:44,cheap_mobile
Turin,POINT (7.702418079996892 45.05754972796922),POINT (7.623229346744799 44.99969774086024),2018-05-14 02:07:30,cheap_mobile
Hamburg,POINT (10.05260098579818 53.53497739746809),POINT (10.05889649564977 53.49486429314853),2018-05-04 00:46:12,cheap_mobile


In [402]:
import psycopg2
import pandas as pd

conn = psycopg2.connect(
    host="localhost",
    database="postgres",
    user="postgres",
    password="Admin01")

cursor = conn.cursor()


In [403]:
data_collect = trips.collect()
 
tot = trips.count()    

print( "Loading ", tot, " records into the database")
    
# looping thorough each row of the dataframe
n = 0
for row in data_collect:
    n += 1
    cursor.execute( "INSERT INTO public.trips(region,origin_coord, destination_coord, datetime, datasource) values (%s,%s,%s,%s,%s)", (row[0], row[1], row[2], row[3], row[4]))            
    print( "Records loaded: ", n, end = "\r")
#conn.rollback()
conn.commit()

cursor.close()
conn.close()

print( "Finished            ")


Loading  100  records into the database
Records loaded:  1Records loaded:  2Records loaded:  3Records loaded:  4Records loaded:  5Records loaded:  6Records loaded:  7Records loaded:  8Records loaded:  9Records loaded:  10Records loaded:  11Records loaded:  12Records loaded:  13Records loaded:  14Records loaded:  15Records loaded:  16Records loaded:  17Records loaded:  18Records loaded:  19Records loaded:  20Records loaded:  21Records loaded:  22Records loaded:  23Records loaded:  24Records loaded:  25Records loaded:  26Records loaded:  27Records loaded:  28Records loaded:  29Records loaded:  30Records loaded:  31Records loaded:  32Records loaded:  33Records loaded:  34Records loaded:  35Records loaded:  36Records loaded:  37Records loaded:  38Records loaded:  39Records loaded:  40Records loaded:  41Records loaded:  42Records loaded:  43Records loaded:  44Records loaded:  45Records loaded:  46Records loaded:  47Records loaded:  48Records l