In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=5cd63f4efea01cd38b525f5e47990d7f648da1d570c1c8e91bc483110457a9d1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [10]:
import sys
from pyspark import *
from pyspark.sql import SparkSession
from datetime import datetime

# Set input and output folders
inputPath  = "registerSample.csv" # args[0]
inputPath2 = "stations.csv" # args[1]
threshold  = 0.6 # args[2]
outputPath = "out_Lab8DF" # args[3]


# For the standalone version
spark = SparkSession.builder.appName("Spark Lab #7 - Template").getOrCreate()


# Read the content of the input file register.csv and store it into a DataFrame
# The input file has an header
# Schema of the input data:
# |-- station: integer (nullable = true)
# |-- timestamp: timestamp (nullable = true)
# |-- used_slots: integer (nullable = true)
# |-- free_slots: integer (nullable = true)
inputDF = spark.read.format("csv").option("delimiter", "\\t").option("header", True).option("inferSchema", True).load(inputPath)


# Remove the lines with num free slots=0 && num used slots=0
filteredDF = inputDF.filter("free_slots<>0 OR used_slots<>0")


# Define a User Defined Function called full(Integer free_slots)
# that returns 
# -- 1 if free_slots is equal to 0
# -- 0 if free_slots is greater than 0
def fullFunction(free_slots):
    if free_slots==0:
        return 1
    else:
        return 0


# Define the UDF
# name: full
# output: integer value
spark.udf.register("full", fullFunction)


# Define a DataFrame with the following schema:
# |-- station: integer (nullable = true)
# |-- dayofweek: string (nullable = true)
# |-- hour: integer (nullable = true)
# |-- fullstatus: integer (nullable = true) - 1 = full, 0 = non-full
stationWeekdayHourDF = filteredDF.selectExpr("station", "date_format(timestamp,'EE') as dayofweek", "hour(timestamp) as hour", "full(free_slots) as fullstatus")


# Define one group for each combination "station, dayofweek, hour"
rgdStationWeekdayHourDF = stationWeekdayHourDF.groupBy("station", "dayofweek", "hour")


# Compute the criticality for each group (station, dayofweek, hour),
# i.e., for each pair (station, timeslot)
# The criticality is equal to the average of fullStatus
stationWeekdayHourCriticalityDF = rgdStationWeekdayHourDF.agg({"fullStatus": "avg"}).withColumnRenamed("avg(fullStatus)", "criticality")

# Select only the lines with criticality > threshold
selectedPairsDF = stationWeekdayHourCriticalityDF.filter("criticality>"+str(threshold))

# Read the content of the input file stations.csv and store it into a DataFrame
# The input file has an header
# Schema of the input data:
# |-- id: integer (nullable = true)
# |-- longitude: double (nullable = true)
# |-- latitude: double (nullable = true)
# |-- name: string (nullable = true)
stationsDF = spark.read.format("csv").option("delimiter", "\\t").option("header", True).option("inferSchema", True).load(inputPath2)

# Join the selected critical "situations" with the stations table to
# retrieve the coordinates of the stations
selectedPairsCoordinatesDF = selectedPairsDF.join(stationsDF, selectedPairsDF.station == stationsDF.id)

# Sort the content of the DataFrame
selectedPairsIdCoordinatesCriticalityDF = selectedPairsCoordinatesDF.selectExpr("station", "dayofweek", "hour", "longitude", "latitude", "criticality").sort(selectedPairsCoordinatesDF.criticality.desc(),      selectedPairsCoordinatesDF.station,      selectedPairsCoordinatesDF.dayofweek,      selectedPairsCoordinatesDF.hour)

selectedPairsIdCoordinatesCriticalityDF.write.format("csv").option("header", True).save(outputPath)

# Close the Spark session
spark.stop()



In [12]:
import sys
from pyspark import *
from pyspark.sql import SparkSession
from datetime import datetime

#################################
# Task 1
#################################

# Set input and output folders
inputPath  = "registerSample.csv" # args[0]
inputPath2 = "stations.csv" # args[1]
threshold  = 0.6 # args[2]
outputPath = "out_Lab8DF_SQL" # args[3]

# For the standalone version
spark = SparkSession.builder.appName("Spark Lab #7 - Template").getOrCreate()


# Read the content of the input file register.csv and store it into a DataFrame
# The input file has an header
# Schema of the input data:
# |-- station: integer (nullable = true)
# |-- timestamp: timestamp (nullable = true)
# |-- used_slots: integer (nullable = true)
# |-- free_slots: integer (nullable = true)
inputDF = spark.read.format("csv").option("delimiter", "\\t").option("header", True).option("inferSchema", True).load(inputPath)


# Assign the “table name” readings to inputDF
inputDF.createOrReplaceTempView("readings")

# Define a User Defined Function called full(Integer free_slots)
# that returns 
# -- 1 if free_slots is equal to 0
# -- 0 if free_slots is greater than 0
def fullFunction(free_slots):
    if free_slots==0:
        return 1
    else:
        return 0

# Define the UDF
# name: full
# output: integer value
spark.udf.register("full", fullFunction)

# Select only the lines with free_slots<>0 or used_slots<>0 and then 
# compute the criticality for each group (station, dayofweek, hour) (i.e., for each pair (station, timeslot))
# and finally select only the groups with criticality>threshold.
#
# The criticality of each group is equal to the average of full(free_slots)
# The schema of the returned dataset is:
# |-- station: integer (nullable = true)
# |-- dayofweek: string (nullable = true)
# |-- hour: integer (nullable = true)
# |-- criticality: double (nullable = true)
selectedPairsDF = spark.sql("""SELECT station, date_format(timestamp,'EE') as dayofweek, 
hour(timestamp) as hour, avg(full(free_slots)) as criticality 
FROM readings 
WHERE free_slots<>0 OR used_slots<>0
GROUP BY station, date_format(timestamp,'EE'), hour(timestamp)
HAVING avg(full(free_slots))>"""+str(threshold))


# Assign the “table name” criticals to selectedPairsDF
selectedPairsDF.createOrReplaceTempView("criticals")

# Read the content of the input file stations.csv and store it into a DataFrame
# The input file has an header
# Schema of the input data:
# |-- id: integer (nullable = true)
# |-- longitude: double (nullable = true)
# |-- latitude: double (nullable = true)
# |-- name: string (nullable = true)
stationsDF = spark.read.format("csv").option("delimiter", "\\t").option("header", True).option("inferSchema", True).load(inputPath2)


# Assign the “table name” stations to stationsDF
stationsDF.createOrReplaceTempView("stations")


# Join the selected critical "situations" with the stations table to
# retrieve the coordinates of the stations.
# Select only the column station, longitude, latitude and criticality
# and sort records by criticality (desc), station (asc)
selectedPairsIdCoordinatesCriticalityDF = spark.sql("""SELECT station, dayofweek, hour, 
longitude, latitude, criticality
FROM criticals, stations
WHERE criticals.station = stations.id
ORDER BY criticality DESC, station, dayofweek, hour""")

selectedPairsIdCoordinatesCriticalityDF.write.format("csv").option("header", True).save(outputPath)


# Close the Spark session
spark.stop()



