In [11]:
%%bash
pwd

/home/dilanveracruz


In [12]:
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .appName("KafkaProducer")\
    .config("spark.sql.shuffle.partitions",6)\
    .config("spark.sql.repl.eagereval.enabled",True)\
    .getOrCreate()

In [13]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

##### import the data 

In [14]:
from pyspark.sql.functions import col

test_df = spark.read.parquet("featured")
test_df = test_df.orderBy(col('date'), col('Hour'))
test_df.printSchema()
test_df.show(5, vertical=True)

root
 |-- Hour: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- DayofWeek: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- end_station_id: double (nullable = true)
 |-- num_rentals: long (nullable = true)
 |-- station_id: integer (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- area_cleaned: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- sunshine: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- cloud: double (nullable = true)
 |-- prec: double (nullable = true)
 |-- press: double (nullable = true)



[Stage 23:>                                                         (0 + 6) / 6]

-RECORD 0------------------------------
 Hour           | 0                    
 Day            | 28                   
 DayofWeek      | 4                    
 Month          | 12                   
 end_station_id | 436.0                
 num_rentals    | 2                    
 station_id     | 246                  
 station_name   | Berry Street, Cle... 
 longitude      | -0.0999941           
 latitude       | 51.5229              
 area_cleaned   | Clerkenwell          
 date           | 20161228             
 sunshine       | 4.7                  
 mean           | 3.0                  
 cloud          | 2.0                  
 prec           | 0.2                  
 press          | 104260.0             
-RECORD 1------------------------------
 Hour           | 0                    
 Day            | 28                   
 DayofWeek      | 4                    
 Month          | 12                   
 end_station_id | 378.0                
 num_rentals    | 1                    


                                                                                

##### Discretize the categorical data 

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import RegressionEvaluator

###### Define the columns you want to index
columns_to_index = ["station_name", "area_cleaned"]
non_categorical_cols = [x for x in test_df.columns if x not in columns_to_index]
index_output_cols = [x + ' Index' for x in columns_to_index]
ohe_output_cols = [x + ' OHE' for x in columns_to_index]


###### Create a list of StringIndexer stages for each column
string_indexer = StringIndexer(inputCols=columns_to_index, outputCols=index_output_cols, handleInvalid="skip")

ohe_encoder = OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

assembler_inputs = ohe_output_cols + non_categorical_cols
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
assembler_inputs

In [15]:
#pip install kafka-python
#conda install -c conda-forge kafka-python

##### Split the data

In [16]:
# Time thresholds for splitting the data
timestamp_threshold_1 = '20190601'
timestamp_threshold_2 = '20191231'

stream_output_path = "stream"

# Time-series 1: Timestamps before the threshold
training_set = test_df.filter(col('date') < timestamp_threshold_1)
# Time-series 2: Timestamps after or equal to the threshold
test_set = test_df.filter((col('date') >= timestamp_threshold_1) & (col('date') < timestamp_threshold_2))
# Time-series 3: Timestamps after the second threshold (streaming data)
stream_set = test_df.filter(col('date') > timestamp_threshold_2)

# Write the training set to the output path
training_set.write.mode("overwrite").parquet(stream_output_path)

# Print the number of rows in each part
print(f"There are {training_set.count()} rows in the training set and {test_set.count()} in the test set.")


                                                                                

There are 4232906 rows in the training set and 1073630 in the test set.


##### Get the data for the first 15 days of january for the data streamming

In [17]:
stream_set = stream_set.filter((col('month') == 1) & (col('Day') <= 15) )
stream_set.count()
stream_set.printSchema()

root
 |-- Hour: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- DayofWeek: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- end_station_id: double (nullable = true)
 |-- num_rentals: long (nullable = true)
 |-- station_id: integer (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- area_cleaned: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- sunshine: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- cloud: double (nullable = true)
 |-- prec: double (nullable = true)
 |-- press: double (nullable = true)



In [19]:
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from json import dumps
from time import sleep
from datetime import datetime

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('my-topic', b'Initializing')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

data_list = stream_set.collect()  

i = 0
# Iterate over the list of Row objects and convert each row to a dictionary
for row in data_list:
    row_dict = row.asDict()
    # Process the row dictionary as needed
    row_json = json.dumps(row_dict).encode('utf-8')  # Serialize to JSON and encode to bytes
    producer.send('my-topic', value=row_json)
    #producer.send('my-topic', row_dict)

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)

my-topic
0
1237583
