In [69]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains, regexp_replace, split, to_timestamp, round, when, hour, count

# spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [45]:
spark.stop()
spark = SparkSession.builder.appName("Test2") \
      .config("spark.dynamicAllocation.maxExecutors", "4") \
      .config("spark.executor.cores", "4") \
      .config("spark.executor.memory", "512m") \
      .config("spark.dynamicAllocation.enabled", "true") \
      .config("spark.dynamicAllocation.initialExecutors", "4") \
      .config("spark.executor.instances", "4") \
      .getOrCreate()

In [86]:

df = spark.read.csv(r'C:\Users\Visagio\Desktop\Jobsity challenge\trips.csv', sep=';', header=True)

# parsing latitude and longitude
df_coord =  df.withColumn("clean_origin", regexp_replace("origin_coord","POINT |\(|\)","")) \
                .withColumn("clean_destiny", regexp_replace("destination_coord","POINT |\(|\)",""))

df_coord2 = df_coord.withColumn('origin_latitude', split(df_coord['clean_origin'], ' ').getItem(1).cast("float")) \
                .withColumn('origin_longitude', split(df_coord['clean_origin'], ' ').getItem(0).cast("float")) \
                .withColumn('destiny_latitude', split(df_coord['clean_destiny'], ' ').getItem(1).cast("float")) \
                .withColumn('destiny_longitude', split(df_coord['clean_destiny'], ' ').getItem(0).cast("float"))

# similar origins and destiny has to be grouped together, so we are rounding with 1 decimal place
df_coord3 = df_coord2.withColumn("origin_latitude", round(col('origin_latitude'),1)) \
                     .withColumn("origin_longitude", round(col('origin_longitude'),1)) \
                     .withColumn("destiny_latitude", round(col('destiny_latitude'),1)) \
                     .withColumn("destiny_longitude", round(col('destiny_longitude'),1))

# defining period of the day to group similar trips together
df_time1 = df_coord3.withColumn("trip_datetime", to_timestamp("datetime", "dd/MM/yyyy HH:mm"))

df_time2 = df_time1.withColumn("period_of_day", when(hour(col("trip_datetime")) <= 5, "Dawn")
                                     .when(hour(col("trip_datetime")) <= 11, "Morning")
                                     .when(hour(col("trip_datetime")) <= 17, "Afternoon")
                                     .otherwise("Night"))
print(df_time2.count())
df_grouped = df_time2.groupBy('region','origin_latitude', 'origin_longitude', \
                         'destiny_latitude', 'destiny_longitude', 'period_of_day').agg(count("*").alias("number_of_trips"))

print(df_grouped.count())
df_grouped.show()


100
93
+-------+---------------+----------------+----------------+-----------------+-------------+---------------+
| region|origin_latitude|origin_longitude|destiny_latitude|destiny_longitude|period_of_day|number_of_trips|
+-------+---------------+----------------+----------------+-----------------+-------------+---------------+
|  Turin|           45.1|             7.6|            45.1|              7.6|         Dawn|              2|
| Prague|           50.1|            14.3|            50.1|             14.5|        Night|              1|
| Prague|           50.0|            14.6|            50.1|             14.3|        Night|              1|
|  Turin|           45.1|             7.5|            45.0|              7.6|      Morning|              1|
|Hamburg|           53.5|             9.9|            53.5|             10.0|         Dawn|              1|
| Prague|           50.1|            14.4|            50.0|             14.4|         Dawn|              1|
|  Turin|           4

In [46]:
%%time
#test scalability

df = spark.read.csv("test.csv", sep=';', header=True)
df_treated =  df.withColumn("clean_origin", regexp_replace("origin_coord","POINT |\(|\)","")) \
                .withColumn("clean_destiny", regexp_replace("destination_coord","POINT |\(|\)",""))

df2 = df_treated.withColumn('origin_latitude', split(df_treated['clean_origin'], ' ').getItem(1).cast("float")) \
                .withColumn('origin_longitude', split(df_treated['clean_origin'], ' ').getItem(0).cast("float")) \
                .withColumn('destiny_latitude', split(df_treated['clean_destiny'], ' ').getItem(1).cast("float")) \
                .withColumn('destiny_longitude', split(df_treated['clean_destiny'], ' ').getItem(0).cast("float"))

df2 = df2.withColumn("trip_datetime", to_timestamp("datetime", "dd/MM/yyyy HH:mm"))

df2 = df2.drop('origin_coord', 'destination_coord','datetime','clean_origin','clean_destiny')

df3 = df2.withColumn("origin_latitude_rounded", round(col('origin_latitude')/0.1,0)) \
.withColumn("origin_longitude_rounded", round(col('origin_longitude')/0.1,0)) \
.withColumn("destiny_latitude_rounded", round(col('destiny_latitude')/0.1,0)) \
.withColumn("destiny_longitude_rounded", round(col('destiny_longitude')/0.1,0))


df4 = df3.alias('df4')
df5 = df3.join(df4, df3.region == df4.region).select('df4.*')

df6 = df5.alias('df6')
df7 = df5.join(df6, df5.region == df6.region).select('df6.*')

# df9 = df3.alias('df9')
# df8 = df7.join(df9, df7.region == df9.region).select('df9.*')

print(df7.count())
df7.write.mode("overwrite").parquet("C:/Users/Visagio/Desktop/Jobsity challenge/outputs/test.parquet")
# df7.write.mode("overwrite").csv("C:/Users/Visagio/Desktop/Jobsity challenge/outputs/test.csv")

#set variable to be used to connect the database
# database = "Jobsity"
# table = "dbo.Trips"

#write the dataframe into a sql table
# df3.write.mode("overwrite") \
#     .format("jdbc") \
#     .option("url", f"jdbc:sqlserver://localhost:1433;databaseName={database};integratedSecurity=true") \
#     .option("dbtable", table) \
#     .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
#     .save()


3885713
CPU times: total: 62.5 ms
Wall time: 11.5 s


In [51]:
from datetime import datetime

# current date and time
curDTObj = datetime.now()

# current time
timeStr = datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")
print("Time:", timeStr)

Time: 2022_10_16_18_11_56_172544
