In [345]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter

spark = SparkSession \
.builder \
.appName("plops_streaming") \
.getOrCreate()


In [346]:
database_name = 'occupancy'
hostname = 'ec2-52-39-242-144.us-west-2.compute.amazonaws.com'
url_connect = "jdbc:postgresql://{hostname}:5432/{db}".format(hostname=hostname, db=database_name)
properties = {"user":"spark_user", 
              "password":os.environ['POSTGRES_PASS'],
              "driver": "org.postgresql.Driver"
             }

In [347]:
from collections import OrderedDict
config_schema = OrderedDict()
config_schema = [
    ('data_id', 'INT'),
    ('meter_id', 'INT'),
    ('transaction_id', 'INT'),
    ('transaction_timestamp', 'STRING'),
    ('amount_usd', 'INT'),
    ('usernumber', 'STRING'),
    ('payment_mean', 'STRING'),
    ('paid_duration', 'INT'),
    ('station_id', 'INT'),
    ('year', 'INT'),
    ('month', 'INT'),
    ('vendor', 'STRING'),
]
schema = ", ".join(["{} {}".format(col, type) for col, type in config_schema])
schema

'data_id INT, meter_id INT, transaction_id INT, transaction_timestamp STRING, amount_usd INT, usernumber STRING, payment_mean STRING, paid_duration INT, station_id INT, year INT, month INT, vendor STRING'

In [348]:
tr_df = spark.read.csv(
    "s3a://project.datasets/transactions/01182019.csv.gz", header=True, mode="PERMISSIVE", schema=schema
)
tr_df.printSchema()
tr_df.show(1)

root
 |-- data_id: integer (nullable = true)
 |-- meter_id: integer (nullable = true)
 |-- transaction_id: integer (nullable = true)
 |-- transaction_timestamp: string (nullable = true)
 |-- amount_usd: integer (nullable = true)
 |-- usernumber: string (nullable = true)
 |-- payment_mean: string (nullable = true)
 |-- paid_duration: integer (nullable = true)
 |-- station_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- vendor: string (nullable = true)

+--------+--------+--------------+---------------------+----------+----------+------------+-------------+----------+----+-----+------+
| data_id|meter_id|transaction_id|transaction_timestamp|amount_usd|usernumber|payment_mean|paid_duration|station_id|year|month|vendor|
+--------+--------+--------------+---------------------+----------+----------+------------+-------------+----------+----+-----+------+
|26060471|19337010|     652611123|  01/12/2019 08:35:42|        19|      NULL

In [349]:
tr_df = tr_df.select('transaction_timestamp','station_id','paid_duration','amount_usd')
tr_df = tr_df.withColumn("transaction_timestamp", F.to_timestamp(tr_df.transaction_timestamp, format="MM/dd/yyyy HH:mm:ss"))
tr_df = tr_df.withColumn("transaction_endtime", (F.unix_timestamp("transaction_timestamp") + tr_df.paid_duration).cast('timestamp'))
tr_df.printSchema()

root
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- station_id: integer (nullable = true)
 |-- paid_duration: integer (nullable = true)
 |-- amount_usd: integer (nullable = true)
 |-- transaction_endtime: timestamp (nullable = true)



In [350]:
tr_df = tr_df.withColumnRenamed("transaction_timestamp", "transaction_starttime")
tr_df = tr_df.select('station_id','transaction_starttime', 'transaction_endtime','paid_duration','amount_usd')
tr_df.printSchema()

root
 |-- station_id: integer (nullable = true)
 |-- transaction_starttime: timestamp (nullable = true)
 |-- transaction_endtime: timestamp (nullable = true)
 |-- paid_duration: integer (nullable = true)
 |-- amount_usd: integer (nullable = true)



In [351]:
table = "active_transactions"
mode = "overwrite"
my_writer = DataFrameWriter(tr_df)
my_writer.jdbc(url_connect, table, mode, properties)

In [352]:
tr_df.count()

207274

In [353]:
tr_df.createOrReplaceTempView("occupancy_streaming")

In [107]:
tr_df.printSchema()
tr_df.show(1)

root
 |-- station_id: integer (nullable = true)
 |-- transaction_starttime: timestamp (nullable = true)
 |-- transaction_endtime: timestamp (nullable = true)

+----------+---------------------+-------------------+
|station_id|transaction_starttime|transaction_endtime|
+----------+---------------------+-------------------+
|     10873|  2019-01-12 08:35:42|2019-01-12 14:22:02|
+----------+---------------------+-------------------+
only showing top 1 row



In [314]:
min_ds = spark.sql("select max(transaction_starttime) as latest_timestamp from occupancy_streaming")
min_ds = min_ds.withColumn("latest_timestamp", F.date_trunc('minute', min_ds.latest_timestamp)) 

min_ds = min_ds.withColumn("second_timestamp", (F.unix_timestamp(min_ds.latest_timestamp) - 60).cast("timestamp")) \
                    .withColumn("third_timestamp", (F.unix_timestamp(min_ds.latest_timestamp) - 120).cast("timestamp")) \
                    .collect()



In [315]:
print(min_ds[0][0])
print(min_ds[0][1])
print(min_ds[0][2])

2019-01-18 23:59:00
2019-01-18 23:58:00
2019-01-18 23:57:00


In [316]:
import os
database_name = 'occupancy'
hostname = 'ec2-52-39-242-144.us-west-2.compute.amazonaws.com'
url_connect = "jdbc:postgresql://{hostname}:5432/{db}".format(hostname=hostname, db=database_name)

properties = {"user":"spark_user", 
              "password":os.environ['POSTGRES_PASS'],
              "driver": "org.postgresql.Driver"
             }
table = "dim_stations"

In [336]:
dim_df = spark.read.jdbc(url=url_connect, table=table, properties=properties)
dim_df = dim_df.select('station_id').withColumn("timestamp", F.lit(min_ds[0][0]))
dim_df = dim_df.union(dim_df.withColumn("timestamp", F.lit(min_ds[0][1])))
dim_df = dim_df.union(dim_df.withColumn("timestamp", F.lit(min_ds[0][2])))

dim_df.printSchema()
dim_df.createOrReplaceTempView("occupancy_perminute")

root
 |-- station_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = false)



In [357]:
dim_df.count()

1668

In [380]:
sql = ("SELECT p.station_id, p.timestamp, count(*) AS occupied_spots "
       "FROM occupancy_perminute p left outer join occupancy_streaming s "
       "ON p.station_id = s.station_id "
       "AND p.timestamp BETWEEN s.transaction_starttime AND s.transaction_endtime "
       "GROUP BY p.station_id, p.timestamp"
      )

occupancy_per_minute = spark.sql(sql)

In [383]:
occupancy_per_minute.count()

1251

In [385]:
table = "live_occupancy"
mode = "overwrite"
my_writer = DataFrameWriter(occupancy_per_minute)
my_writer.jdbc(url_connect, table, mode, properties)