For this project, I decided to use Kafka just to practice connecting Spark with Kafka. Obviously, this project doesn't need Kafka since the data is in a csv. However, we can stream each row based on the start and end time.

So, we will make a config.py file that will contain a variable that decides how long should the stream take. In other words, how long should it take to stream the data beginning from the first start date to the last end date to kafka. This variable will be in seconds. 

Let's try to read it.

In [1]:
! pip install pyspark
! pip install findspark
import findspark
findspark.init()
from config import RUNTIME
RUNTIME

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Using cached pyspark-3.3.0.tar.gz (281.3 MB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764010 sha256=05ab210fdd34481bce5e3e8b88f0f4bdf7031ba351b3aea22c4101514808ccae
  Stored in directory: /home/mohamed/.cache/pip/wheels/05/75/73/81f84d174299abca38dd6a06a5b98b08ae25fce50ab8986fa1
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successful

3600

Now this is out of the way, let's scale the time.

From the previous notebook, we know the first start date is '2016-01-14 20:18:33' and the last end date is '2022-01-01 00:00:00'. Let's convert these dates to seconds, scale them to fit inside the RUNTIME window, and stream the entire record (without the end time) with the start time and then stream the end time when its time comes. 

In [2]:
from pyspark.sql import SparkSession
from config import DATADIR

# Using 6 cores instead of * due to the limited memory on my machine, less threads, less memory
spark = SparkSession \
    .builder \
    .master('local[6]') \
    .appName("Transform to Stream With Pyspark") \
    .config('spark.local.dir',DATADIR + "/temp/") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/08 23:13:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/08 23:13:11 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/08/08 23:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
from pyspark.sql.functions import unix_timestamp

Let's test that and verify using an online tool.

In [4]:
main_df = spark.read.csv(DATADIR + 'US_Accidents_Dec21_updated.csv')
main_df

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string, _c20: string, _c21: string, _c22: string, _c23: string, _c24: string, _c25: string, _c26: string, _c27: string, _c28: string, _c29: string, _c30: string, _c31: string, _c32: string, _c33: string, _c34: string, _c35: string, _c36: string, _c37: string, _c38: string, _c39: string, _c40: string, _c41: string, _c42: string, _c43: string, _c44: string, _c45: string, _c46: string]

The headers ...

In [5]:
main_df = spark.read.csv(DATADIR + 'US_Accidents_Dec21_updated.csv', header = True)
main_df

DataFrame[ID: string, Severity: string, Start_Time: string, End_Time: string, Start_Lat: string, Start_Lng: string, End_Lat: string, End_Lng: string, Distance(mi): string, Description: string, Number: string, Street: string, Side: string, City: string, County: string, State: string, Zipcode: string, Country: string, Timezone: string, Airport_Code: string, Weather_Timestamp: string, Temperature(F): string, Wind_Chill(F): string, Humidity(%): string, Pressure(in): string, Visibility(mi): string, Wind_Direction: string, Wind_Speed(mph): string, Precipitation(in): string, Weather_Condition: string, Amenity: string, Bump: string, Crossing: string, Give_Way: string, Junction: string, No_Exit: string, Railway: string, Roundabout: string, Station: string, Stop: string, Traffic_Calming: string, Traffic_Signal: string, Turning_Loop: string, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string]

In [6]:
from pyspark.sql.types import FloatType, LongType, datetime
from pyspark.sql.functions import split

main_df = main_df.withColumn('Start_Time_Unix', unix_timestamp(split(main_df['Start_Time'],'\.').getItem(0)))
main_df = main_df.withColumn('End_Time_Unix', unix_timestamp(split(main_df['End_Time'],'\.').getItem(0)))

main_df

DataFrame[ID: string, Severity: string, Start_Time: string, End_Time: string, Start_Lat: string, Start_Lng: string, End_Lat: string, End_Lng: string, Distance(mi): string, Description: string, Number: string, Street: string, Side: string, City: string, County: string, State: string, Zipcode: string, Country: string, Timezone: string, Airport_Code: string, Weather_Timestamp: string, Temperature(F): string, Wind_Chill(F): string, Humidity(%): string, Pressure(in): string, Visibility(mi): string, Wind_Direction: string, Wind_Speed(mph): string, Precipitation(in): string, Weather_Condition: string, Amenity: string, Bump: string, Crossing: string, Give_Way: string, Junction: string, No_Exit: string, Railway: string, Roundabout: string, Station: string, Stop: string, Traffic_Calming: string, Traffic_Signal: string, Turning_Loop: string, Sunrise_Sunset: string, Civil_Twilight: string, Nautical_Twilight: string, Astronomical_Twilight: string, Start_Time_Unix: bigint, End_Time_Unix: bigint]

We used the split because some of the values in the `Start_Time` and `End_Time` columns had a millisecond. We cut these off with split and remember to escape the `.` with `\` .

Now we're close! Lets create a table with each unix time and a column that stats the id, of course if two time stamps have the same id then the earlier is the start and the later is the end.

But first, we need to clean the ID column! This should be easy.

Actually let's just set the types appropriately while we at it.

In [7]:
from pyspark.sql.functions import split
from pyspark.sql.types import TimestampType, ByteType, IntegerType, FloatType, DoubleType, BooleanType

bool_columns = ['Amenity','Bump','Crossing','Give_Way','Junction','No_Exit','Railway','Roundabout'
                ,'Station','Stop','Traffic_Calming','Traffic_Signal','Turning_Loop']

for column in bool_columns:
    main_df = main_df.withColumn(column, main_df[column].cast(BooleanType()))

float_columns = ['Precipitation(in)','Wind_Speed(mph)' , 'Temperature(F)', 'Wind_Chill(F)'
                , 'Pressure(in)','Humidity(%)' , 'Visibility(mi)']

float_columns_renamed = ['Precipitation_in','Wind_Speed_mph' , 'Temperature_F', 'Wind_Chill_F'
                , 'Pressure_in','Humidity_Percent' , 'Visibility_mi']

# Other columns don't have nulls or we prefer them to stay nulls 
main_df = main_df.fillna('-1', subset = float_columns + ['Number', 'Zipcode'])

for column,renamed in zip(float_columns, float_columns_renamed):
    main_df = main_df.withColumn(renamed, main_df[column].cast(FloatType()))
    
string_nulls = ['Wind_Direction','Weather_Condition','Airport_Code', 'City', 'Street' 
               ,'Astronomical_Twilight', 'Nautical_Twilight', 'Civil_Twilight', 'Sunrise_Sunset']

main_df = main_df.fillna('Not Available', subset = string_nulls)
main_df = main_df.fillna('22022-01-01 00:00:00', subset = 'Weather_Timestamp')

main_df = main_df.drop(*float_columns)

main_df = main_df.withColumn('ID', split(main_df['ID'],'-').getItem(1).cast(IntegerType()))
main_df = (main_df.withColumn('Severity', main_df['Severity'].cast(ByteType())) 
                .withColumn('Start_Time', main_df['Start_Time'].cast(TimestampType())) 
                .withColumn('End_Time', main_df['End_Time'].cast(TimestampType())) 
                .withColumn('Start_Lat', main_df['Start_Lat'].cast(DoubleType())) 
                .withColumn('End_Lat', main_df['End_Lat'].cast(DoubleType())) 
                .withColumn('Start_Lng', main_df['Start_Lng'].cast(DoubleType()))  
                .withColumn('End_Lng', main_df['End_Lng'].cast(DoubleType()))  
                .withColumn('Distance_mi', main_df['Distance(mi)'].cast(DoubleType()))
                .drop('Distance(mi)')
                .withColumn('Number', main_df['Number'].cast(IntegerType()))
                .withColumn('Weather_Timestamp', main_df['Weather_Timestamp'].cast(TimestampType())) \
          )


main_df.select('ID','Start_Time_Unix','End_Time_Unix').show()
main_df.printSchema()

+---+---------------+-------------+
| ID|Start_Time_Unix|End_Time_Unix|
+---+---------------+-------------+
|  1|     1454884628|   1454906228|
|  2|     1454903780|   1454925380|
|  3|     1454904939|   1454926539|
|  4|     1454907105|   1454928705|
|  5|     1454910823|   1454932423|
|  6|     1454912217|   1454933817|
|  7|     1454912141|   1454933741|
|  8|     1454925106|   1454946706|
|  9|     1454933997|   1454955597|
| 10|     1454937403|   1454959003|
| 11|     1454939030|   1454960630|
| 12|     1454943057|   1454964657|
| 13|     1454945259|   1454966859|
| 14|     1454945418|   1454967018|
| 15|     1454947871|   1454969471|
| 16|     1454947871|   1454969471|
| 17|     1454953662|   1454975262|
| 18|     1454953662|   1454975262|
| 19|     1454955202|   1454976802|
| 20|     1454958017|   1454979617|
+---+---------------+-------------+
only showing top 20 rows

root
 |-- ID: integer (nullable = true)
 |-- Severity: byte (nullable = true)
 |-- Start_Time: timestamp (null

I know someone might ask, but why set the types? Well, if any of the following services in our project expect a type different than the actual type, it can cause an error or return nulls.

In the following notebooks, I'll try to point out each point where not changing the type can cause a problem.

In [8]:
from pyspark.sql.functions import lit
temp_df = main_df.select(lit('Start'),'ID','Start_Time_Unix') \
        .union(main_df.select(lit('End'),'ID','End_Time_Unix')) \
        .withColumnRenamed('Start', 'Event') \
        .orderBy('Start_Time_Unix') \
        .orderBy('ID')
temp_df.show()
temp_df.printSchema()



+-----+---+---------------+
|Event| ID|Start_Time_Unix|
+-----+---+---------------+
|Start|  1|     1454884628|
|  End|  1|     1454906228|
|  End|  2|     1454925380|
|Start|  2|     1454903780|
|  End|  3|     1454926539|
|Start|  3|     1454904939|
|Start|  4|     1454907105|
|  End|  4|     1454928705|
|Start|  5|     1454910823|
|  End|  5|     1454932423|
|  End|  6|     1454933817|
|Start|  6|     1454912217|
|Start|  7|     1454912141|
|  End|  7|     1454933741|
|Start|  8|     1454925106|
|  End|  8|     1454946706|
|  End|  9|     1454955597|
|Start|  9|     1454933997|
|  End| 10|     1454959003|
|Start| 10|     1454937403|
+-----+---+---------------+
only showing top 20 rows

root
 |-- Event: string (nullable = false)
 |-- ID: integer (nullable = true)
 |-- Start_Time_Unix: long (nullable = true)





Absolute perfection! Maybe change column name, but other than that ... absolute perfection!

You may notice that the second column is basically the same value repeated, but this is multiplied by 10^9.

So, all this work and we still didn't get the scaling done. The scaling after this point is easy, we just subtract the earliest time and divide by the latest time and multiply by the RUNTIME.

In [9]:
temp_df = temp_df.withColumn('Time_Unix',temp_df['Start_Time_Unix'])
earliest = temp_df.agg({'Time_Unix':"min"}).collect()
earliest

                                                                                

[Row(min(Time_Unix)=1452795513)]

The value is inside a Pyspark row. Don't worry, we can get it out.
A simple google search leads us to a few methods to do so, below is two of them.

In [10]:
# earliest = earliest[0].__getitem__('min(Time_Unix)')
earliest = earliest[0][0]
earliest

1452795513

In [11]:
latest = temp_df.agg({"Time_Unix":"max"}).collect()
latest = latest[0][0]
latest

                                                                                

1640988000

In [12]:
# We can specify the return type of the udf, instead of the approach we used before
temp_df = temp_df.withColumn('Stream_Time',
            (temp_df['Time_Unix'].cast(FloatType()) - earliest) * RUNTIME / (latest - earliest))

A job well done!
Just one final step.

In [13]:
temp_df = temp_df.withColumnRenamed('ID','temp_id') # so we can drop it
to_delete = ('Start_Time_Unix','End_Time_Unix','Time_Unix',"temp_id")

In [14]:
stream_df = temp_df.join(main_df, temp_df.temp_id == main_df.ID)
stream_df = stream_df.drop(*to_delete)
stream_df = stream_df.orderBy('Stream_Time')

In [15]:
stream_df.count()

                                                                                

5690684

That was alot of work. Now we will just write to a parquet file and read from it in the streaming script. First, we will repartition the dataframe. Why? Because we don't want to run out of memory and more partitions means each partition is smaller (and more balanced) and less memory pressure.

In [16]:
from pyspark.sql.functions import spark_partition_id
stream_df.groupBy(spark_partition_id()).count().show()
stream_df = stream_df.repartition(50)
stream_df.groupBy(spark_partition_id()).count().show()

[Stage 26:>                                                         (0 + 6) / 6]

22/08/08 23:13:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.


                                                                                

+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0| 840255|
|                   1| 919366|
|                   2| 929988|
|                   3| 898228|
|                   4| 951716|
|                   5|1151131|
+--------------------+-------+





+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   0|113814|
|                   1|113814|
|                   2|113814|
|                   3|113814|
|                   4|113814|
|                   5|113814|
|                   6|113814|
|                   7|113814|
|                   8|113814|
|                   9|113815|
|                  10|113814|
|                  11|113814|
|                  12|113814|
|                  13|113815|
|                  14|113815|
|                  15|113815|
|                  16|113815|
|                  17|113815|
|                  18|113815|
|                  19|113814|
+--------------------+------+
only showing top 20 rows



                                                                                

In [None]:
targetfolder = DATADIR + 'parquet/'
stream_df.coalesce(1).write.parquet(targetfolder)

22/08/08 23:14:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 75:>                                                         (0 + 1) / 1]

What is coalesce? Basically I needed to write out to a parquet file, removing the coalesce(1) from the last line will give us a folder with multiple parquet files inside; that is a file for each partition.

To ask pyspark for a single file we need to reduce the number of partitions using repartition or coalesce. They both can reduce number of partitions; however, coalesce is more optimized since it can only reduce number of partitions (unlike repartition). 

Now to rename the output file so that we can read it from the streaming script without this weird name.

In [None]:
import os
os.listdir(targetfolder)

What even is that name, spark? It's ok, we got it.

In [None]:
filename = os.listdir(targetfolder)[0]
os.rename(f'{targetfolder}{filename}', f'{targetfolder}stream_df.parquet')

And we're done for this one. Let's now stream that dataframe to a kafka topic. Woohoo!