# ALL installs 

In [32]:
# ! pip install requests
# ! pip install kafka-python
# ! pip install pyspark
# !pip  install -r requirements.txt

In [33]:
# !pip freeze > requirements.txt

# ALL imports

In [34]:
from allsecrets import *
from coordinates import *
from schemas import *
import json,requests
from json import dumps,loads 
from kafka import KafkaProducer, KafkaConsumer
from datetime import datetime

In [35]:
from pyspark.sql import SparkSession , functions as F
from pyspark.sql.functions import udf,col,countDistinct,date_format,row_number
from pyspark.sql.types import FloatType , StringType , IntegerType , StructType , StructField , TimestampType
from pyspark.sql.window import Window 

spark = SparkSession.builder.appName('Weather')\
    .getOrCreate()

### Json print function

In [5]:
def json_print(json_object):
    text = json.dumps(json_object, sort_keys=True, indent=4) # sort_keys=True means sort the keys in alphabetical order
    # indent 4 means 4 spaces for each indentation
    print(text)

# 1st Producer and consumer

## Producer  to handle API response and send to Kafka topic

In [38]:
Producer_API = KafkaProducer(bootstrap_servers=[
                         'localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))

for lat, lon in lat_lon.items():
    parameters = {'lat': lat, "lon": lon, "appid": open_weather_api}
    weather_response = requests.get(
        "http://api.openweathermap.org/data/2.5/forecast", params=parameters)
    Producer_API.send('weather', value=weather_response.json())
    Producer_API.flush()  # flush the data to the kafka broker ( topic) and  make sure data  is sent to the kafka broker and  not lost in the buffer

## Consumer to read from Kafka and write to file

In [39]:
# if consumer_dump.json file exists, delete it
import os
if os.path.exists('consumer_dump.json'):
    os.remove('consumer_dump.json')

In [40]:

Consumer_API = KafkaConsumer('weather', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest',
                         enable_auto_commit=True, value_deserializer=lambda x: loads(x.decode('utf-8')))

In [41]:
# consume the data from the kafka broker and save to json file   %%%% stop it after some time   10-15 sec %%%%%
for message in Consumer_API:
    message = message.value
    with open('consumer_dump.json', 'a') as f:
        json.dump(message, f)
        # append new line  to end of  each json object
        f.write('\n')

KeyboardInterrupt: 

In [None]:
# Consumer_API.topics()

In [None]:
# close consumer to free up resources after reading the messages
# Consumer_API.close()

In [None]:
# create topic
# ! kafka-topics --bootstrap-server localhost:9092 --create --topic weather

In [None]:
# delete topic
# ! kafka-topics --bootstrap-server localhost:9092 --delete --topic weather

# Spark magic

## Unpacking and Exploding the  columns

In [48]:
weather_df = spark.read.json('consumer_dump.json')


nested_json_type=type (weather_df.select("city").take(1)[0][0]) # get the type of the nested json / dictionary object

# function to expand the nested json object only
def expand_json(df):
    for c in df.columns:
        if type(df.select(c).take(1)[0][0]) == nested_json_type:
            df = df.select("*", F.col(c+".*")).drop(c)
            print(c)
    return df
# weather_df.show(26)
# weather_df.printSchema()

In [49]:

#explode "list" column  as it is in list  of dictionary containing 40 records
weather_df = weather_df.select ("*", F.explode("list").alias("every_3_hr_weather")).drop("list")

# weather_df.show(30)
# weather_df.printSchema()


In [50]:
# LEVEL 1 Unpacking (city and every_3_hr_weather  column)
weather_df=expand_json(weather_df)
# weather_df.show(30)
# weather_df.printSchema()

# expand_json function can be replaced by   below  code :  and more ...
# df_weather = df_weather.select("*", F.col("city.*")) \
#     .drop("city")

# df_weather = df_weather.select("*", F.col("coord.*")) \
#     .drop("coord")

city
every_3_hr_weather


In [51]:
#explode weather columnn ,it is in list  of dictionary conataing 1 record only
weather_df = weather_df.select ("*", F.explode("weather").alias("weather_dict")).drop("weather")

# LEVEL 2 Unpacking  (coord,clouds,main,rain,sys,wind,weather_dict column)
weather_df=expand_json(weather_df)

# rename column "all" to "coludiness"
weather_df = weather_df.withColumnRenamed("all", "cloudiness")

# weather_df.show(30)
# weather_df.printSchema()

coord
clouds
main
sys
wind
weather_dict


In [52]:
#  delete the column which are not required
cleaned_weather_df = weather_df.drop("cod","message","cnt","3h","icon","timezone","population","country","id","temp","temp_kf")

# cleaned_weather_df.printSchema()
cleaned_weather_df.show(10)

+--------------+----------+----------+----------+-------------------+---+----------+-------+-------+----------+----------+----------+--------+--------+---------+--------+--------+---+---+----+-----+-----------+------+
|          name|   sunrise|    sunset|        dt|             dt_txt|pop|visibility|    lat|    lon|cloudiness|feels_like|grnd_level|humidity|pressure|sea_level|temp_max|temp_min|pod|deg|gust|speed|description|  main|
+--------------+----------+----------+----------+-------------------+---+----------+-------+-------+----------+----------+----------+--------+--------+---------+--------+--------+---+---+----+-----+-----------+------+
|Baudhatinchule|1668732107|1668770745|1668762000|2022-11-18 09:00:00|0.0|     10000|27.7167|85.3667|        14|    292.34|       871|      48|    1018|     1018|  293.04|  292.63|  d|190|1.17| 0.43| few clouds|Clouds|
|Baudhatinchule|1668732107|1668770745|1668772800|2022-11-18 12:00:00|0.0|     10000|27.7167|85.3667|         9|     287.5|      

# Questions

## Question 1

In [53]:
# UDf to convert unix time stamp to date time  
def unix_to_datetime(unix_time):
    return datetime.fromtimestamp(unix_time).strftime('%Y-%m-%d %H:%M:%S')


unix_to_datetime_udf = F.udf(unix_to_datetime, StringType())


In [54]:
# apply the UDF to the  dt,sunrise,sunset column
Q1_weather_df = cleaned_weather_df.withColumn("dt", unix_to_datetime_udf("dt")).withColumn("sunrise", unix_to_datetime_udf("sunrise")).withColumn("sunset", unix_to_datetime_udf("sunset"))
Q1_weather_df.show(5)
# Q1_weather_df.printSchema()

+--------------+-------------------+-------------------+-------------------+-------------------+---+----------+-------+-------+----------+----------+----------+--------+--------+---------+--------+--------+---+---+----+-----+-----------+------+
|          name|            sunrise|             sunset|                 dt|             dt_txt|pop|visibility|    lat|    lon|cloudiness|feels_like|grnd_level|humidity|pressure|sea_level|temp_max|temp_min|pod|deg|gust|speed|description|  main|
+--------------+-------------------+-------------------+-------------------+-------------------+---+----------+-------+-------+----------+----------+----------+--------+--------+---------+--------+--------+---+---+----+-----+-----------+------+
|Baudhatinchule|2022-11-18 06:26:47|2022-11-18 17:10:45|2022-11-18 14:45:00|2022-11-18 09:00:00|0.0|     10000|27.7167|85.3667|        14|    292.34|       871|      48|    1018|     1018|  293.04|  292.63|  d|190|1.17| 0.43| few clouds|Clouds|
|Baudhatinchule|2022

###  Q1 Producer to save transformation to Kafka topic

In [57]:
Q1_Producer = KafkaProducer(bootstrap_servers=[
                         'localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))


for row in Q1_weather_df.rdd.collect():
    # send data to kafka topic in schema and payload json format 
    Q1_Producer.send( 'Q1_date_time', {"schema": Schema_Q1, "payload": row.asDict()})
    Q1_Producer.flush()  # flush the data to the kafka broker ( topic) and  make sure data  is sent to the kafka broker and  not lost in the buffer

### Q1 connector config

In [58]:

Q1_connector_name="Q1_date_time-sink"
config={
    "name":Q1_connector_name ,
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/Kafka_project",
        "tasks.max": "1", 
        "topics": "Q1_date_time",
        "insert.mode": "insert",
        "connection.user": "amrit",
        "connection.password": "1234",
        "table.name.format": "Q1_date_time",
        "auto.create": "true",
        "auto.evolve": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "auto.offset.reset": "earliest"
    }
}

In [59]:
# create sink connector
response = requests.post("http://localhost:8083/connectors", headers={"Content-Type": "application/json"}, data=json.dumps(config))

print(response.json())

{'name': 'Q1_date_time-sink', 'config': {'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector', 'connection.url': 'jdbc:postgresql://localhost:5432/Kafka_project', 'tasks.max': '1', 'topics': 'Q1_date_time', 'insert.mode': 'insert', 'connection.user': 'amrit', 'connection.password': '1234', 'table.name.format': 'Q1_date_time', 'auto.create': 'true', 'auto.evolve': 'true', 'value.converter': 'org.apache.kafka.connect.json.JsonConverter', 'value.converter.schemas.enable': 'true', 'auto.offset.reset': 'earliest', 'name': 'Q1_date_time-sink'}, 'tasks': [], 'type': 'sink'}


In [60]:
# connector status=
response = requests.get("http://localhost:8083/connectors/"+Q1_connector_name+"/status")
print(response.json())

{'name': 'Q1_date_time-sink', 'connector': {'state': 'RUNNING', 'worker_id': '127.0.1.1:8083'}, 'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': '127.0.1.1:8083'}], 'type': 'sink'}


## Question 2

In [61]:
# for each city , find highest wind speed , temperature , humidity , pressure , cloudiness 
Q2_weather_df = cleaned_weather_df. \
    groupBy("name"). \
    agg(F.max("speed").alias("max_wind_speed"),F.max("temp_max").alias("max_temp"),F.max("humidity").alias("max_humidity")
        ,F.max("pressure").alias("max_pressure"),F.max("cloudiness").alias("max_cloudiness")) .sort("name")

Q2_weather_df.show(10)
Q2_weather_df.printSchema()


+--------------+--------------+--------+------------+------------+--------------+
|          name|max_wind_speed|max_temp|max_humidity|max_pressure|max_cloudiness|
+--------------+--------------+--------+------------+------------+--------------+
|      Basahiya|          2.91|  300.78|          76|        1017|            73|
|Baudhatinchule|          1.59|  294.45|          86|        1019|            40|
|     Bhaktapur|          1.46|  294.22|          86|        1019|            40|
|     Bharatpur|          2.26|  300.23|          90|        1018|            12|
|    Biratnagar|           3.2|  301.51|          74|        1016|            92|
| Birendranagar|          2.27|  297.36|          83|        1019|            35|
|       Birgunj|          2.94|  300.76|          77|        1017|            31|
|        Butwāl|          2.39|  298.67|          86|        1018|            25|
|     Dhangadhi|          3.01|  299.17|          75|        1018|            65|
|        Dharān|

###  Q2 Producer to save transformation to Kafka topic

In [62]:
Q2_Producer = KafkaProducer(bootstrap_servers=[
                         'localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))


for row in Q2_weather_df.rdd.collect():
    # send data to kafka topic in schema and payload json format 
    Q2_Producer.send( 'Q2_max', {"schema": Schema_Q2, "payload": row.asDict()})
    Q2_Producer.flush()  # flush the data to the kafka broker ( topic) and  make sure data  is sent to the kafka broker and  not lost in the buffer

### Q2 connector config

In [63]:

Q2_connector_name="Q2_max-sink"
config={
    "name":Q2_connector_name ,
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/Kafka_project",
        "tasks.max": "1", 
        "topics": "Q2_max",
        "insert.mode": "insert",
        "connection.user": "amrit",
        "connection.password": "1234",
        "table.name.format": "Q2_max",
        "auto.create": "true",
        "auto.evolve": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "auto.offset.reset": "earliest"
    }
}

In [64]:
# create sink connector
response = requests.post("http://localhost:8083/connectors", headers={"Content-Type": "application/json"}, data=json.dumps(config))

print(response.json())

{'name': 'Q2_max-sink', 'config': {'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector', 'connection.url': 'jdbc:postgresql://localhost:5432/Kafka_project', 'tasks.max': '1', 'topics': 'Q2_max', 'insert.mode': 'insert', 'connection.user': 'amrit', 'connection.password': '1234', 'table.name.format': 'Q2_max', 'auto.create': 'true', 'auto.evolve': 'true', 'value.converter': 'org.apache.kafka.connect.json.JsonConverter', 'value.converter.schemas.enable': 'true', 'auto.offset.reset': 'earliest', 'name': 'Q2_max-sink'}, 'tasks': [], 'type': 'sink'}


In [65]:
# connector status=
response = requests.get("http://localhost:8083/connectors/"+Q2_connector_name+"/status")
print(response.json())

{'name': 'Q2_max-sink', 'connector': {'state': 'RUNNING', 'worker_id': '127.0.1.1:8083'}, 'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': '127.0.1.1:8083'}], 'type': 'sink'}


## Question 3

In [66]:
def extract_day(dt_txt):
    return dt_txt.split(" ")[0]

extract_day_udf = F.udf(extract_day, StringType())

Q3_weather_df = cleaned_weather_df.withColumn("day", extract_day_udf("dt_txt"))

#  for each city  , each day , list min temperature  at night time pod=n
Q3_weather_df = Q3_weather_df.filter(Q3_weather_df.pod == "n").groupBy("name","day").agg(F.min("temp_min").alias("min_temp_night")).sort("name","day")

# Q3_weather_df.show(5)
# Q3_weather_df.printSchema()


###  Q3 Producer to save transformation to Kafka topic

In [67]:
Q3_Producer = KafkaProducer(bootstrap_servers=[
                         'localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))


for row in Q3_weather_df.rdd.collect():
    # send data to kafka topic in schema and payload json format 
    Q3_Producer.send( 'Q3_min_night', {"schema": Schema_Q3, "payload": row.asDict()})
    Q3_Producer.flush()  # flush the data to the kafka broker ( topic) and  make sure data  is sent to the kafka broker and  not lost in the buffer

### Q3 connector config

In [68]:

Q3_connector_name="Q3_min_night-sink"
config={
    "name":Q3_connector_name ,
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/Kafka_project",
        "tasks.max": "1", 
        "topics": "Q3_min_night",
        "insert.mode": "insert",
        "connection.user": "amrit",
        "connection.password": "1234",
        "table.name.format": "Q3_min_night",
        "auto.create": "true",
        "auto.evolve": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "auto.offset.reset": "earliest"
    }
}

In [69]:
# create sink connector
response = requests.post("http://localhost:8083/connectors", headers={"Content-Type": "application/json"}, data=json.dumps(config))

print(response.json())

{'name': 'Q3_min_night-sink', 'config': {'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector', 'connection.url': 'jdbc:postgresql://localhost:5432/Kafka_project', 'tasks.max': '1', 'topics': 'Q3_min_night', 'insert.mode': 'insert', 'connection.user': 'amrit', 'connection.password': '1234', 'table.name.format': 'Q3_min_night', 'auto.create': 'true', 'auto.evolve': 'true', 'value.converter': 'org.apache.kafka.connect.json.JsonConverter', 'value.converter.schemas.enable': 'true', 'auto.offset.reset': 'earliest', 'name': 'Q3_min_night-sink'}, 'tasks': [], 'type': 'sink'}


In [70]:
# connector status=
response = requests.get("http://localhost:8083/connectors/"+Q3_connector_name+"/status")
print(response.json())

{'name': 'Q3_min_night-sink', 'connector': {'state': 'RUNNING', 'worker_id': '127.0.1.1:8083'}, 'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': '127.0.1.1:8083'}], 'type': 'sink'}
