<a href="https://colab.research.google.com/github/AlperYaanik/yelp-data-pipeline-kafka-spark/blob/main/yelp_kafka_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Dataset Link**
>  https://business.yelp.com/data/resources/open-dataset/






# **SETUPS AND INSTALLATIONS**

In [None]:
#Install Kafka and Apache Spark extensions for Python
!pip install kafka-python
!pip install pyspark

Collecting kafka-python
  Downloading kafka_python-2.2.3-py2.py3-none-any.whl.metadata (10.0 kB)
Downloading kafka_python-2.2.3-py2.py3-none-any.whl (307 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/307.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m307.2/307.5 kB[0m [31m13.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.5/307.5 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.2.3


In [None]:
#Download Kafka
!curl -sSOL https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
!tar -xzf kafka_2.13-3.9.0.tgz
!rm kafka_2.13-3.9.0.tgz

#Set up zookeeper
!./kafka_2.13-3.9.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.9.0/config/zookeeper.properties
!./kafka_2.13-3.9.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.9.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper are up and running"
!sleep 10

!./kafka_2.13-3.9.0/bin/kafka-topics.sh --create --topic business_data --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
!./kafka_2.13-3.9.0/bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name business_data \
  --alter --add-config max.message.bytes=2000000

Waiting for 10 secs until kafka and zookeeper are up and running
Created topic business_data.
Completed updating config for topic business_data.


In [None]:
#Import Extensions
import json
import os

from kafka import KafkaProducer
from kafka.errors import KafkaError, NoBrokersAvailable, KafkaTimeoutError

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DoubleType

# **KAFKA PART**

In [None]:
try:
    producer = KafkaProducer(bootstrap_servers='localhost:9092', \
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'), \
                         key_serializer = lambda v: str(v).encode('utf-8'), \
                         retries=5, max_request_size=2_000_000, retry_backoff_ms=1000, \
                         enable_idempotence=True, max_in_flight_requests_per_connection=1, acks='all')
    print("Kafka Producer has been created")
except NoBrokersAvailable:
    print("Could not connect to the Kafka broker")


# Function to send JSON lines from a file
def send_json_file_to_kafka(file_path, topic):
    with open(file_path, 'r') as jsonfile:
        for line in jsonfile:
            try:
                record = json.loads(line)
                key = record['business_id']
                try:
                  producer.send(topic, key=key, value=record)
                except KafkaError as e:
                  print(f"Kafka send error: {e}")
            except json.JSONDecodeError:
                continue  # skip malformed lines

    try:
      producer.flush(timeout=120)
    except KafkaTimeoutError as e:
      print(f"Timeout error: {e}")

# Send both files (i used the yelp academic dataset)
send_json_file_to_kafka('/content/yelp_academic_dataset_business.json', 'business_data')
send_json_file_to_kafka('/content/yelp_academic_dataset_checkin.json', 'business_data')

Kafka Producer has been created


# **SPARK PART**

In [None]:
# Subscribe to 1 topic defaults to the earliest and latest offsets
spark = SparkSession.builder \
        .appName("KafkaExample") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
        .getOrCreate()

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "business_data") \
  .load()
data_df = df.selectExpr("CAST(key AS STRING)" ,"CAST(value AS STRING)")
print(data_df.dtypes)



[('key', 'string'), ('value', 'string')]


In [None]:
#Make data distinct and print
print(data_df.count())
data_df.show(5, truncate=False)
df_distinct = data_df.distinct()

print(df_distinct.count())
df_distinct.filter(col("key")=="Pns2l4eNsfO8kk83dixA6A").show(truncate=False)


564552
+----------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
#Parse data to extract only value using the schema
schema = StructType([
    StructField("business_id", StringType()),
    StructField("name", StringType()),
    StructField("address", StringType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("postal_code", StringType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("stars", DoubleType()),
    StructField("review_count", IntegerType()),
    StructField("is_open", IntegerType()),
    StructField("attributes", MapType(StringType(), StringType())),
    StructField("categories", StringType()),
    StructField("hours", MapType(StringType(), StringType())),
    StructField("date", StringType())
])
data_df_parsed=df_distinct.withColumn("data",from_json(col("value"),schema)).select("data.*")
data_df_parsed.show(5, truncate = False)
data_df_parsed.printSchema()

+----------------------+-----------------------------------------------------+-----------------------+------------+-----+-----------+-------------+--------------+-----+------------+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
#Merge two different data using join
data_part_one= data_df_parsed.select(col("business_id"),col("name"),col("address"),col("city"),col("state"),\
                                     col("postal_code"),col("latitude"),col("longitude"),col("stars"),col("review_count"),col("is_open"),\
                                     col("attributes"),col("categories"),col("hours"))
data_part_two= data_df_parsed.select(col("business_id"),col("date")).filter(col("date").isNotNull())

merged_data=data_part_one.join(data_part_two, on="business_id", how="left").filter(
    (col("name").isNotNull()) |
    (col("address").isNotNull()) |
    (col("city").isNotNull()) |
    (col("state").isNotNull()) |
    (col("postal_code").isNotNull())
)
merged_data.show(5, truncate = False)
print(merged_data.count())

+----------------------+--------------------------------------------+------------------------+------------+-----+-----------+-------------+--------------+-----+------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
#Check if the datasets were successfully merged
data_df_parsed.filter(col("business_id")=="Pns2l4eNsfO8kk83dixA6A").show(truncate = False)
merged_data.filter(col("business_id")=="Pns2l4eNsfO8kk83dixA6A").show(truncate = False)

+----------------------+------------------------+----------------------+-------------+-----+-----------+----------+------------+-----+------------+-------+---------------------------+----------------------------------------------------------------------------------------------------------+-----+-------------------+
|business_id           |name                    |address               |city         |state|postal_code|latitude  |longitude   |stars|review_count|is_open|attributes                 |categories                                                                                                |hours|date               |
+----------------------+------------------------+----------------------+-------------+-----+-----------+----------+------------+-----+------------+-------+---------------------------+----------------------------------------------------------------------------------------------------------+-----+-------------------+
|Pns2l4eNsfO8kk83dixA6A|Abby Rappoport, LAC, CMQ|

# **Save the Merged Dataset**

In [None]:
#Transform to JSON format
merged_data.coalesce(1).write.json("merged_data", mode="overwrite")

In [None]:
#Change output file's name
for file in os.listdir("merged_data"):
  if file.endswith(".json"):
    os.rename(os.path.join("merged_data", file),\
              os.path.join("merged_data", "yelp_academic_dataset_business_and_checkin.json"))

  if file.endswith(".json.crc"):
    os.rename(os.path.join("merged_data", file),\
              os.path.join("merged_data", ".yelp_academic_dataset_business_and_checkin.json.crc"))

print(os.listdir("merged_data"))


['.yelp_academic_dataset_business_and_checkin.json.crc', '._SUCCESS.crc', 'yelp_academic_dataset_business_and_checkin.json', '_SUCCESS']
