In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json,col, isnan, when, lower
from pyspark.sql.types import StructType, StringType, FloatType, IntegerType, StructField,DoubleType,T
from pyspark.sql import functions as F

In [2]:
spark = (
    SparkSession.builder.master("local[1]")
    .appName("solution")
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
    )
    .getOrCreate()
)

In [3]:
spark

In [4]:
landslides_schema = StructType([
    StructField("id", StringType()),
    StructField("distance", StringType()),
    StructField("landslide_size", StringType()),
    StructField("injuries", StringType()),
    StructField("fatalities", StringType()),
     StructField("location_columns", StructType([
         StructField("country_code", StringType()),
         StructField("state/province", StringType()),
         StructField("city/town", StringType()),
         StructField("population", StringType()),
         StructField("latitude", StringType()),
         StructField("longitude", StringType())
     ])),
        StructField("time_columns", StructType([
        StructField("date", StringType()),
        StructField("time", StringType()),
     ]))
])


In [5]:
data = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "landslides_topic") \
  .option("failOnDataLoss", "true") \
  .load()
data.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
parse_data = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), landslides_schema).alias("data")).select("data.*")
parse_data.show(5)
parse_data.printSchema()

+---+--------+--------------+--------+----------+--------------------+---------------+
| id|distance|landslide_size|injuries|fatalities|    location_columns|   time_columns|
+---+--------+--------------+--------+----------+--------------------+---------------+
| 34| 3.40765|         Small|    null|      null|{US, Virginia, Ch...|{3/2/07, Night}|
| 42| 3.33522|         Small|    null|      null|{US, Ohio, New Ph...|{3/22/07, null}|
| 56| 2.91977|         Small|    null|      null|{US, Pennsylvania...| {4/6/07, null}|
| 59| 2.98682|         Small|    null|      null|{CA, Quebec, Chât...|{4/14/07, null}|
| 61| 5.66542|         Small|    null|       0.0|{US, Kentucky, Pi...|{4/15/07, null}|
+---+--------+--------------+--------+----------+--------------------+---------------+
only showing top 5 rows

root
 |-- id: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- landslide_size: string (nullable = true)
 |-- injuries: string (nullable = true)
 |-- fatalities: string (nu

In [7]:
parse_data.count()

3386

In [8]:
denest_df = parse_data.dropDuplicates()

In [9]:
denest_df.drop('time').show(10)

+----+--------+--------------+--------+----------+--------------------+------------------+
|  id|distance|landslide_size|injuries|fatalities|    location_columns|      time_columns|
+----+--------+--------------+--------+----------+--------------------+------------------+
| 277| 2.79113|        Medium|    null|       3.0|{GT, Guatemala, G...|   {9/22/07, null}|
| 984| 4.86398|        Medium|    null|       0.0|{DO, Santiago, Pe...|   {2/12/09, null}|
|2156| 4.00979|         Small|    null|       0.0|{TT, Diego Martin...|   {7/29/10, null}|
|3130|  3.7758|        Medium|    null|       0.0|{JM, Saint Mary, ...|   {2/16/11, null}|
|3825| 3.75018|        Medium|    null|       0.0|{US, Florida, Wil...|   {7/21/11, null}|
|4613| 4.23278|        Medium|    null|      null|{US, North Caroli...|  {11/14/12, null}|
|4715| 5.63904|        Medium|    null|      null|{US, North Caroli...|   {1/15/13, null}|
|4922| 1.41843|         Small|    null|       0.0|{US, New York, Mi...|{6/16/13, 0:00:00}|

In [10]:

dinest_df = parse_data.select(
    "id",
    "distance",
    "landslide_size",
    "injuries",
    "fatalities",
    "location_columns.*",
    "time_columns.*"
    
)
dinest_df.show(10)

+---+--------+--------------+--------+----------+------------+----------------+----------------+----------+--------+---------+-------+-----+
| id|distance|landslide_size|injuries|fatalities|country_code|  state/province|       city/town|population|latitude|longitude|   date| time|
+---+--------+--------------+--------+----------+------------+----------------+----------------+----------+--------+---------+-------+-----+
| 34| 3.40765|         Small|    null|      null|          US|        Virginia|     Cherry Hill|     16000| 38.6009| -77.2682| 3/2/07|Night|
| 42| 3.33522|         Small|    null|      null|          US|            Ohio|New Philadelphia|     17288| 40.5175| -81.4305|3/22/07| null|
| 56| 2.91977|         Small|    null|      null|          US|    Pennsylvania|     Wilkinsburg|     15930| 40.4377|  -79.916| 4/6/07| null|
| 59| 2.98682|         Small|    null|      null|          CA|          Quebec|     Châteauguay|     42786| 45.3226| -73.7771|4/14/07| null|
| 61| 5.66542

In [11]:
dinest_df = dinest_df.fillna({"latitude": dinest_df.agg({'latitude': 'avg'}).collect()[0][0]})
dinest_df = dinest_df.fillna({"longitude": dinest_df.agg({'longitude': 'avg'}).collect()[0][0]})
dinest_df = dinest_df.fillna({"distance": dinest_df.agg({'distance': 'avg'}).collect()[0][0]})


In [12]:

df_not_null = dinest_df.filter((F.col("date").isNotNull()) & \
            (F.col("state/province").isNotNull()) & \
             (F.col("city/town").isNotNull()))

df_not_null.show()

+---+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+-------+-----+
| id|distance|landslide_size|injuries|fatalities|country_code|   state/province|           city/town|population|latitude|longitude|   date| time|
+---+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+-------+-----+
| 34| 3.40765|         Small|    null|      null|          US|         Virginia|         Cherry Hill|     16000| 38.6009| -77.2682| 3/2/07|Night|
| 42| 3.33522|         Small|    null|      null|          US|             Ohio|    New Philadelphia|     17288| 40.5175| -81.4305|3/22/07| null|
| 56| 2.91977|         Small|    null|      null|          US|     Pennsylvania|         Wilkinsburg|     15930| 40.4377|  -79.916| 4/6/07| null|
| 59| 2.98682|         Small|    null|      null|          CA|           Quebec|         Châteauguay|     42786| 45.3226| -7

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DateType

dinest_df = dinest_df.withColumn("id", F.col("injuries").cast(IntegerType())) \
                     .withColumn("injuries", F.col("injuries").cast(IntegerType())) \
                     .withColumn("fatalities", F.col("fatalities").cast(IntegerType())) \
                     .withColumn("date", F.col("date").cast(DateType()))


In [16]:
dinest_df.show()

+----+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+----+-----+
|  id|distance|landslide_size|injuries|fatalities|country_code|   state/province|           city/town|population|latitude|longitude|date| time|
+----+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+----+-----+
|null| 3.40765|         Small|    null|      null|          US|         Virginia|         Cherry Hill|     16000| 38.6009| -77.2682|null|Night|
|null| 3.33522|         Small|    null|      null|          US|             Ohio|    New Philadelphia|     17288| 40.5175| -81.4305|null| null|
|null| 2.91977|         Small|    null|      null|          US|     Pennsylvania|         Wilkinsburg|     15930| 40.4377|  -79.916|null| null|
|null| 2.98682|         Small|    null|      null|          CA|           Quebec|         Châteauguay|     42786| 45.3226| -73.7771|null

In [18]:
fixed_df = dinest_df.withColumn("injuries", F.when(col("injuries").isNull(), 0).otherwise(col("injuries"))) \
    .withColumn("fatalities", F.when(col("fatalities").isNull(), 0).otherwise(col("fatalities")))
fixed_df

DataFrame[id: int, distance: string, landslide_size: string, injuries: int, fatalities: int, country_code: string, state/province: string, city/town: string, population: string, latitude: string, longitude: string, date: date, time: string]

In [19]:
lowercased_data = fixed_df.withColumn("landslide_size", F.expr("lower(landslide_size)"))

In [21]:
lowercased_data.show()

+----+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+----+-----+
|  id|distance|landslide_size|injuries|fatalities|country_code|   state/province|           city/town|population|latitude|longitude|date| time|
+----+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+----+-----+
|null| 3.40765|         small|       0|         0|          US|         Virginia|         Cherry Hill|     16000| 38.6009| -77.2682|null|Night|
|null| 3.33522|         small|       0|         0|          US|             Ohio|    New Philadelphia|     17288| 40.5175| -81.4305|null| null|
|null| 2.91977|         small|       0|         0|          US|     Pennsylvania|         Wilkinsburg|     15930| 40.4377|  -79.916|null| null|
|null| 2.98682|         small|       0|         0|          CA|           Quebec|         Châteauguay|     42786| 45.3226| -73.7771|null

In [22]:
Country_schema = StructType([
    StructField("country_name", StringType()),
    StructField("country_code", StringType()),])

In [25]:
new_data = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "countries_topic") \
  .option("failOnDataLoss", "true") \
  .load()
new_data.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [27]:
new_data.show(5)

+----+--------------------+---------------+---------+------+--------------------+-------------+
| key|               value|          topic|partition|offset|           timestamp|timestampType|
+----+--------------------+---------------+---------+------+--------------------+-------------+
|null|[7B 22 63 6F 75 6...|countries_topic|        0|     0|2023-07-13 13:17:...|            0|
|null|[7B 22 63 6F 75 6...|countries_topic|        0|     1|2023-07-13 13:17:...|            0|
|null|[7B 22 63 6F 75 6...|countries_topic|        0|     2|2023-07-13 13:17:...|            0|
|null|[7B 22 63 6F 75 6...|countries_topic|        0|     3|2023-07-13 13:17:...|            0|
|null|[7B 22 63 6F 75 6...|countries_topic|        0|     4|2023-07-13 13:17:...|            0|
+----+--------------------+---------------+---------+------+--------------------+-------------+
only showing top 5 rows



In [30]:
country_data = new_data.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), Country_schema).alias("aws")).select("aws.*")

In [31]:
country_data.printSchema()

root
 |-- country_name: string (nullable = true)
 |-- country_code: string (nullable = true)



In [32]:
country_df.show()

+------------+------------------+
|country_name|      country_code|
+------------+------------------+
|          US|     United States|
|          CA|            Canada|
|          CO|          Colombia|
|          EC|           Ecuador|
|          MX|            Mexico|
|          DO|Dominican Republic|
|          GT|         Guatemala|
|          JM|           Jamaica|
|          DM|          Dominica|
|          PK|          Pakistan|
|          NI|         Nicaragua|
|          SV|       El Salvador|
|          CR|        Costa Rica|
|          HT|             Haiti|
|          PR|       Puerto Rico|
|          PE|              Peru|
|          PA|            Panama|
|          LC|       Saint Lucia|
|          HN|          Honduras|
|          BB|          Barbados|
+------------+------------------+
only showing top 20 rows



In [33]:
countries_df_with_schema = country_df.withColumnRenamed("country_name", "country_temp")
countries_df_with_schema = countries_df_with_schema.withColumnRenamed("country_code", "country_name")
countries_df_with_schema = countries_df_with_schema.withColumnRenamed("country_temp", "country_code")

In [34]:
countries_df_with_schema.show()

+------------+------------------+
|country_code|      country_name|
+------------+------------------+
|          US|     United States|
|          CA|            Canada|
|          CO|          Colombia|
|          EC|           Ecuador|
|          MX|            Mexico|
|          DO|Dominican Republic|
|          GT|         Guatemala|
|          JM|           Jamaica|
|          DM|          Dominica|
|          PK|          Pakistan|
|          NI|         Nicaragua|
|          SV|       El Salvador|
|          CR|        Costa Rica|
|          HT|             Haiti|
|          PR|       Puerto Rico|
|          PE|              Peru|
|          PA|            Panama|
|          LC|       Saint Lucia|
|          HN|          Honduras|
|          BB|          Barbados|
+------------+------------------+
only showing top 20 rows



In [35]:
joined_data = dinest_df.join(
    country_df,
    dinest_df["country_code"] == country_df["country_code"],
    "left"
).drop(country_df["country_code"])

In [37]:
joined_data.show()

+----+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+----+-----+------------+
|  id|distance|landslide_size|injuries|fatalities|country_code|   state/province|           city/town|population|latitude|longitude|date| time|country_name|
+----+--------+--------------+--------+----------+------------+-----------------+--------------------+----------+--------+---------+----+-----+------------+
|null| 9.51003|        Medium|    null|         7|          MX|   Veracruz-Llave|Laguna Chica (Pue...|      1947| 18.5369| -96.8229|null| null|        null|
|null| 2.98682|         Small|    null|      null|          CA|           Quebec|         Châteauguay|     42786| 45.3226| -73.7771|null| null|        null|
|null| 1.74759|         Small|    null|      null|          CA|          Ontario|              Ottawa|    812129| 45.4257| -75.6896|null| null|        null|
|null| 4.74385|        Medium|    null|         5|        

In [38]:
joined_data.write.format("json").save("output/data")