# Chat 💬 (Bronze &rarr; Silver)

## Init

Import dependencies

In [None]:
import findspark
from decouple import Config, RepositoryEnv
import os

In [None]:
findspark.init()
findspark.add_packages('mysql:mysql-connector-java:8.0.11')

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Get *.env* values

In [None]:
# Env values
# ENV_PATH = os.path.join(os.path.dirname(__file__), '../.env')
env = Config(RepositoryEnv('../.env'))
db_server = env.get('DB_IP')
db_name = env.get('DB_DATABASE')
db_user = env.get('DB_USERNAME')
db_password = env.get('DB_PASSWORD')

## Start

Create Spark session

In [None]:
spark = SparkSession.builder \
    .appName("PySpark SQL Server Connection") \
    .getOrCreate()

# .config("spark.jars", "file:///C:/spark-3.4.1-bin-hadoop3/jars/mssql-jdbc-12.6.1.jre8.jar") \

Start database connection and data **extraction** from bronze table

In [None]:
# db_server = '127.0.0.1:1433'
# db_name = 'DataAnalytics'
# db_user = 'KafkaConsumer'
# db_password = 'zipTrJ0NtoIWuGj'
jdbc_url = f"jdbc:sqlserver://{db_server};databaseName={db_name};user={db_user};password={db_password};encrypt=true;trustServerCertificate=true;"
db_table = "Twitch.MessagesStg"

employees_df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", db_table) \
    .load()

employees_df.show(truncate=False)

**Transform** data

In [None]:
SPLIT_SEPARATOR = "SPLIT-SEPARATOR"
# Remove all line breaks
e1_df = employees_df.withColumn('MsgResponse',regexp_replace(col('MsgResponse'),'\r\n',' '))
# Add a key word at the begining of each command to then know where do we need to split
e1_df = e1_df.withColumn("MsgResponse", regexp_replace(col('MsgResponse'), '(:[^:!]+![^!@]+@[^@.]+\.tmi\.twitch\.tv PRIVMSG)(?!$)',  f"{SPLIT_SEPARATOR}"+r"$1"))
e1_df = e1_df.withColumn("MessageResponseSplit", split(col('MsgResponse'),SPLIT_SEPARATOR))
# Remove first element of split list (empty element)
e1_df = e1_df.withColumn("MessageResponseSplit", expr("slice(MessageResponseSplit, 2, SIZE(MessageResponseSplit))"))
# Create a row for each element
e1_df = e1_df.withColumn("MsgResponse", explode(col('MessageResponseSplit')))

# Split to get each section of the command
splitted_col = split(e1_df['MsgResponse'],' ')

e1_df = e1_df.withColumn('Username', regexp_extract(splitted_col.getItem(0), ':(.*)!', 1))
e1_df = e1_df.withColumn('Command', splitted_col.getItem(1))
e1_df = e1_df.withColumn('Channel', regexp_replace(splitted_col.getItem(2),'#',''))
e1_df = e1_df.withColumn('MsgResponseMeta', concat_ws(' ',splitted_col.getItem(0),splitted_col.getItem(1),splitted_col.getItem(2),lit(':'))) # Aux
e1_df = e1_df.withColumn('Message', regexp_replace(col('MsgResponse'),col('MsgResponseMeta'),' '))

# e1_df = e1_df.withColumn('Tag', regexp_extract(col('Message'), '@(.*) ', 1))

# Drop columns that are not needed
e1_df = e1_df.drop('Id','MsgResponse','MessageResponseSplit','MsgResponseMeta')

e1_df.show(truncate=False)

In [None]:
# Keep only the command that are chat related ('PRIVMSG') and generate an id for each record
e2_df = e1_df \
    .where(col('Command')=='PRIVMSG') \
    .withColumn("Id", monotonically_increasing_id()) \
    .select(col('Id'), col('Date'), col('Username'), col('Command'), col('Channel'), col('Message'))

e2_df.show(truncate=False)

**Load** data into database silver table

In [None]:
output_table = 'Twitch.MessagesRef'

e2_df.write \
  .format("jdbc") \
  .mode("overwrite") \
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  .option("url", jdbc_url) \
  .option("dbtable", output_table) \
  .save()

Stop Spark session

In [None]:
spark.stop()