In this notebook, we are interested to figure out correct data transformation strategy
1. [x] User Data
2. [x] Message Data
3. [x] Message Data within Airflow

## Constants and Common Functionalities

In [14]:
# Source Constants
DATA_SOURCE_ROOT = "../../dtc-capstone-data/slack-data" 
COURSE_CHANNEL = "course-data-engineering"
WELCOME_CHANNEL = "welcome"
USERS_DATA = "users.json"
# book-of-the-week
# announcements-course-data-engineering
# shameless-content

In [15]:
## Target Constants
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", 'dtc-capstone-344019')
BUCKET = os.environ.get("GCP_GCS_BUCKET", 'dtc_capstone_data-lake')

In [16]:
import json
import yaml
import re
import os
import pandas as pd
from glob import glob

In [17]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('dtc-capstone') \
    .getOrCreate()

## Users-Data

We have single JSON file which store all Users information for the complete Data Talks Club Slak group.
This data contains, different set of attributes, where some of them repeatative and uninformative.
Therefore in this section we will decide columns that we are interested and continue the development by using that subet of data.

### 1. Read JSON file

In [None]:
users = spark.read.json(f'{DATA_SOURCE_ROOT}/{USERS_DATA}') 

In [None]:
users.printSchema()

### 2. Cleanup Unnecessary Columns

In [18]:
columns_to_drop = ['profile', 'color', 'who_can_share_contact_card', 'team_id', '_corrupt_record']
users = users.drop(*columns_to_drop)

In [19]:
# Check schema after cleanup
users.printSchema()
users.schema

root
 |-- deleted: boolean (nullable = true)
 |-- id: string (nullable = true)
 |-- is_admin: boolean (nullable = true)
 |-- is_app_user: boolean (nullable = true)
 |-- is_bot: boolean (nullable = true)
 |-- is_email_confirmed: boolean (nullable = true)
 |-- is_invited_user: boolean (nullable = true)
 |-- is_owner: boolean (nullable = true)
 |-- is_primary_owner: boolean (nullable = true)
 |-- is_restricted: boolean (nullable = true)
 |-- is_ultra_restricted: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- real_name: string (nullable = true)
 |-- tz: string (nullable = true)
 |-- tz_label: string (nullable = true)
 |-- tz_offset: long (nullable = true)
 |-- updated: long (nullable = true)



StructType(List(StructField(deleted,BooleanType,true),StructField(id,StringType,true),StructField(is_admin,BooleanType,true),StructField(is_app_user,BooleanType,true),StructField(is_bot,BooleanType,true),StructField(is_email_confirmed,BooleanType,true),StructField(is_invited_user,BooleanType,true),StructField(is_owner,BooleanType,true),StructField(is_primary_owner,BooleanType,true),StructField(is_restricted,BooleanType,true),StructField(is_ultra_restricted,BooleanType,true),StructField(name,StringType,true),StructField(real_name,StringType,true),StructField(tz,StringType,true),StructField(tz_label,StringType,true),StructField(tz_offset,LongType,true),StructField(updated,LongType,true)))

In [20]:
user_schema = types.StructType([
    types.StructField("deleted",types.BooleanType(),True),
    types.StructField("id",types.StringType(),True),
    types.StructField("is_admin",types.BooleanType(),True),
    types.StructField("is_app_user",types.BooleanType(),True),
    types.StructField("is_bot",types.BooleanType(),True),
    types.StructField("is_email_confirmed",types.BooleanType(),True),
    types.StructField("is_invited_user",types.BooleanType(),True),
    types.StructField("is_owner",types.BooleanType(),True),
    types.StructField("is_primary_owner",types.BooleanType(),True),
    types.StructField("is_restricted",types.BooleanType(),True),
    types.StructField("is_ultra_restricted",types.BooleanType(),True),
    types.StructField("name",types.StringType(),True),
    types.StructField("real_name",types.StringType(),True),
    types.StructField("tz",types.StringType(),True),
    types.StructField("tz_label",types.StringType(),True),
    types.StructField("tz_offset",types.IntegerType(),True),
    types.StructField("updated",types.LongType(),True)
    ]
)

In [22]:
# 2.2 (optional) After deciding schema we can read the json file again with that schema definition
users = spark.read.schema(user_schema).json(f'{DATA_SOURCE_ROOT}/{USERS_DATA}') 
users.printSchema()

### 3. Create SubSets for User Data

In [24]:
def format_user_data(d):
    identity = ["name", "real_name"]
    location = ["tz","tz_label", "tz_offset"]
    status = ["deleted", "is_admin", "is_owner", "is_primary_owner",
              "is_restricted","is_ultra_restricted","is_bot", 
              "is_email_confirmed"]
    
    user = {}
    for key, value in {"identifiers": identity, "location":location, "status":status}.items():
        user[key]={}
        user[key]["id"] = d["id"] # each group will have the id column
        for v in value:
            if v in d:
                user[key][v]=d[v]
            else:
                user[key][v]="None"   
                
    return user

### 4. Upload Clean Data in 4 different files.

### 5.Create 4 External Table from the Buckets and changed files

## Message-Data

In that section, we will check the datastructure of `course-data-engineering` channel. The slack-data for the channel consist of daily json files. Therefore we need to ensure how we ingest data to cloud. In below, you will see that we are going to handle messages monthly basis rather than running ingestion job daily.

### 1. Read All JSON files

In [96]:
# Find all related files 
files = sorted(glob(f'{DATA_SOURCE_ROOT}/{COURSE_CHANNEL}/*.json'))

# Analyse start and end of the data set
start, end = files[0].split("/")[-1].split(".")[0], files[-1].split("/")[-1].split(".")[0]
(start,end)

('2020-11-22', '2022-03-14')

In [97]:
# 1. Load the data into pyspark dataframe
messages_cde = spark.read.json(f'{DATA_SOURCE_ROOT}/{COURSE_CHANNEL}/*.json', multiLine=True) 
print((messages_cde.count(), len(messages_cde.columns)))

[Stage 204:>                                                        (0 + 1) / 1]

(9136, 32)


                                                                                

### 2. Analyse the Column Values

In [81]:
messages_cde.printSchema()

root
 |-- client_msg_id: string (nullable = true)
 |-- parent_user_id: string (nullable = true)
 |-- reactions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- users: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- reply_count: long (nullable = true)
 |-- subtype: string (nullable = true)
 |-- text: string (nullable = true)
 |-- thread_ts: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- user: string (nullable = true)



In [32]:
messages_cde.select('subtype').distinct().collect()

[Row(subtype='tombstone'),
 Row(subtype=None),
 Row(subtype='thread_broadcast'),
 Row(subtype='channel_join'),
 Row(subtype='channel_topic')]

In [33]:
messages_cde.select('type').distinct().collect()

[Row(type='message')]

In [34]:
messages_cde.select('topic').distinct().collect()

[Row(topic=None),
 Row(topic='<https://docs.google.com/document/d/1TI1co3aRcYYBB9keAMPgrnQd84juN-i8bZiiXW_1oUo/edit?usp=sharing>'),
 Row(topic='<https://github.com/DataTalksClub/data-engineering-zoomcamp>')]

### 3. CleanUp Unnecessary Columns

In [98]:
from pyspark.sql.functions import explode, col

In [99]:
# 3.1 Remove Rows with Unwanted subtypes
messages_cde = messages_cde.where((col("subtype").isNull()) | ((col("subtype") != "thread_broadcast") & (col("subtype") != "channel_join")))

In [100]:
# 3.2 Keep the columns that makes sense for the rest
interest_columns = [
    "client_msg_id",
    "parent_user_id",
    "reactions",
    "reply_count",
    "subtype",
    "text",
    "thread_ts",
    "ts",
    "user",
]
messages_cde = messages_cde.select(interest_columns)

### 4. Transform Data

#### 4.1 Extract the Reactions from the Message data

In [101]:
# 4.1 Extract reactions into new dataframe
# In here caution that user columns is the owner of message where users put reaction.
reactions_df = messages_cde.where((col("reactions").isNotNull())) 
reactions_df = reactions_df.select(['client_msg_id',"user",'reactions'])
reactions_df = reactions_df.withColumn('reaction', explode('reactions')).drop("reactions")
reactions_df = reactions_df.select(['client_msg_id',"user","reaction.name", "reaction.count", "reaction.users"])
reactions_df.select(['client_msg_id',"user","name", "count", "users"]).show()

+--------------------+-----------+---------------+-----+--------------------+
|       client_msg_id|       user|           name|count|               users|
+--------------------+-----------+---------------+-----+--------------------+
|d751f16a-ca2a-403...|U02UEJMRB8X|         scream|    2|[U01AXE0P5M3, U02...|
|780394d7-0647-499...|U02QBJYQFK9|   raised_hands|    1|       [U02V24WAZRN]|
|425a4e1f-fd43-496...|U02QJJ30E05|   raised_hands|    1|       [U02U8FXSG1Y]|
|aa607d5d-55fe-45f...|U02U809EAE7|             +1|    1|       [U02U34YJ8C8]|
|a72b3aa0-db7b-488...|U02U809EAE7|             +1|    3|[U02U34YJ8C8, U02...|
|1ade0883-ba49-4d3...|U0290EYCA7Q|   raised_hands|    1|       [U0308865C0H]|
|1ade0883-ba49-4d3...|U0290EYCA7Q|             +1|    1|       [U0308865C0H]|
|29f12de9-4834-405...|U02R09ZR6FQ|             +1|    1|       [U02UY1QTGHW]|
|53f8696b-acd2-476...|U02U34YJ8C8|             +1|    1|       [U02HFP7UTFB]|
|3a7e75e9-1603-4b5...|U01AXE0P5M3|   raised_hands|    1|       [

In [52]:
unique_reactions = reactions_df.select('name').distinct().rdd.flatMap(lambda x: x).collect()
# unique_reactions

#### 4.2 Cleanup Messages from unwanted characters with Spark UDF

In [102]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType

# 4 Clean Message from some unwanted characters
def clean_message_text(text):             
    user_pattern = re.compile(r'<@(.+?)>')
    link_pattern_text = re.compile(r'<(http.+?)\|(.+?)>')
    link_pattern = re.compile(r'<(http.+?)>')
    
    text = text.replace('\xa0', ' ').replace('•', '-').replace('\n\n', '\n').replace("'", '').replace("`", '')
    text = re.sub('\n', ' ', text) 
    text = user_pattern.sub("", text)
    text = link_pattern_text.sub("", text)
    text = link_pattern.sub("", text)
    return text

udf_clean_message_text = udf(lambda x:clean_message_text(x),StringType())

In [103]:
messages_cde = messages_cde.withColumn("text",udf_clean_message_text(col("text")))

In [104]:
messages_cde.select("text").head()

Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/3.2.1/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/Cellar/apache-spark/3.2.1/libexec/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/Cellar/apache-spark/3.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/Cellar/apache-spark/3.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


Row(text='Hi , I had a lot of problems when trying to convert the .ipynb file to a py script. I copied exactly the code in the video and I kept getting the error No template sub-directory with name script can be found. After a bit of googling, I found that if I uninstalled nbconvert module and then reinstalled it with a lower version, the command would work. I was close to throwing my laptop out of the window by that point! Obviously we all have different computers, OS and python versions, so mentioning the module version would be very helpful!')

#### 4.3 Convert time columns from Epoch to Timestamp format with Spark UDF

In [105]:
from pyspark.sql.functions import udf,col, to_timestamp
from pyspark.sql.types import StringType,IntegerType, DateType
import time

In [106]:
def epoch_2_datetime(epoch):
    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch))

udf_epoch_2_datetime = udf(lambda x: epoch_2_datetime(x),StringType())

In [115]:
messages_cde = messages_cde.withColumn("ts", messages_cde["ts"].cast(IntegerType()))

messages_cde = messages_cde.withColumn("ts",udf_epoch_2_datetime(col("ts")))
messages_cde = messages_cde.withColumn("ts",to_timestamp(col("ts")))

In [117]:
messages_cde.head()

                                                                                

Row(client_msg_id='9e77c6e5-e288-4060-a6e6-3ec9185345c2', parent_user_id='U02TMP4GJEM', reactions=None, reply_count=None, subtype=None, text='Hi , I had a lot of problems when trying to convert the .ipynb file to a py script. I copied exactly the code in the video and I kept getting the error No template sub-directory with name script can be found. After a bit of googling, I found that if I uninstalled nbconvert module and then reinstalled it with a lower version, the command would work. I was close to throwing my laptop out of the window by that point! Obviously we all have different computers, OS and python versions, so mentioning the module version would be very helpful!', thread_ts='1642995076.257900', ts=datetime.datetime(2022, 3, 28, 17, 30, 2), user='U02TMP4GJEM')

#### 4.4 Split Root messages and Threaded messages

In [118]:
root_messages = messages_cde.where((col("parent_user_id").isNull()))
thread_replies = messages_cde.where((col("parent_user_id").isNotNull()))

print(f"All_messages  :{(messages_cde.count(), len(messages_cde.columns))}")
print(f"Root_messages :{(root_messages.count(), len(root_messages.columns))}")
print(f"Thread Replies:{(thread_replies.count(), len(thread_replies.columns))}")

All_messages  :(9087, 9)
Root_messages :(1862, 9)
Thread Replies:(7225, 9)


In [119]:
# Check which columns are all empty
from pyspark.sql import functions as F

def columns_not_in_use(df):
#     nonNull_cols = [c for c in df.columns if df.filter(F.col(c).isNotNull()).count() > 0]
    null_cols = [c for c in df.columns if df.filter(F.col(c).isNotNull()).count() == 0]
    return null_cols

In [120]:
columns_2_drop_root_messages = columns_not_in_use(root_messages)
dcolumns_2_drop_thread_replies = columns_not_in_use(thread_replies)
print(columns_2_drop_root_messages)
print("-----")
print(dcolumns_2_drop_thread_replies)



['parent_user_id']
-----
[]


                                                                                

## Message Data - Airflow DAG approach

The idea is run exactly similiar steps described in Message-Data section but with different order and with different tool set (read with pyspark but do majority of transformation with pandas)

There are couple of reasons for that.
1. Need to ensure data I am dealing with always have same format, and if there is mismatch then enforce it as null through schema. By doing that, I do not need write bunch of if checks within the function.
2. The Airflow has some issues to pickle UDFs, therefore I needed a different approach to apply transformations on column level and need to switch pandas.

### 1. Read JSON files with the Predefined Schema

In [121]:
message_schema = types.StructType([
    types.StructField("client_msg_id",types.StringType(),False),
    types.StructField("parent_user_id",types.StringType(),True),
    types.StructField("text",types.StringType(),True), #-> 
    types.StructField("type",types.StringType(),True),
    types.StructField("subtype",types.StringType(),True),
    types.StructField("user",types.StringType(),True), #-> id and user fk.
    types.StructField("ts",types.StringType(),True), #-> epoch to human readable format
    types.StructField("thread_ts",types.StringType(),True),
    types.StructField("reply_count",types.IntegerType(),True),
    types.StructField("reactions",types.ArrayType(types.StructType([
        types.StructField("count",types.LongType(),True),
        types.StructField("name" ,types.StringType(),True),
        types.StructField("users" ,types.ArrayType(types.StringType(),True),True)
        ])
    ),True),
    ]
)

In [122]:
message_data = spark.read.schema(message_schema) \
        .json(f'{DATA_SOURCE_ROOT}/{COURSE_CHANNEL}/2020-11-*.json', multiLine=True)

In [123]:
message_data.printSchema()

root
 |-- client_msg_id: string (nullable = true)
 |-- parent_user_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- subtype: string (nullable = true)
 |-- user: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- thread_ts: string (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- reactions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- users: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)



In [126]:
from pyspark.sql.functions import col, explode, lit
message_data = message_data.withColumn("channel_name", lit(COURSE_CHANNEL))

### 2. Extract the Reactions from Message Data

In [127]:
def extract_reactions_data(df):
    reactions_df = df.where((col("reactions").isNotNull()))
    reactions_df = reactions_df.select(["client_msg_id", "user", "channel_name", "reactions"])
    reactions_df = reactions_df.withColumnRenamed("user", "msg_owner")

    reactions_df = reactions_df.withColumn("reaction", explode("reactions")).drop(
        "reactions"
    )
    reactions_df = reactions_df.select(
        ["client_msg_id", "msg_owner", "reaction.name", "reaction.count", "reaction.users", "channel_name"]
    )
    reactions_df = reactions_df.withColumn("msg_reactor", explode("users"))
    drop_cols = ["users", "count"]
    reactions_df = reactions_df.drop(*drop_cols)

    return reactions_df

In [128]:
reactions_df = extract_reactions_data(message_data)

In [129]:
reactions_df.columns

['client_msg_id', 'msg_owner', 'name', 'channel_name', 'msg_reactor']

### 3. Transform Data

#### 3.1 Remove Rows with specific Column Values

In [131]:
from pyspark.sql.functions import explode,col
message_data = message_data.where((col("subtype").isNull()) | ((col("subtype") != "thread_broadcast") & (col("subtype") != "channel_join")))

#### 3.2 Convert Data to Pandas

In [132]:
message_data = message_data.toPandas() 

In [133]:
message_data.info
message_data.dtypes

client_msg_id      object
parent_user_id     object
text               object
type               object
subtype            object
user               object
ts                 object
thread_ts          object
reply_count       float64
reactions          object
channel_name       object
dtype: object

#### 3.3 Cleanup Messages from unwanted characters  with Pandas

In [140]:
def clean_message_text(text):
    user_pattern = re.compile(r"<@(.+?)>")
    link_pattern_text = re.compile(r"<(http.+?)\|(.+?)>")
    link_pattern = re.compile(r"<(http.+?)>")

    text = (
        text.replace("\xa0", " ")
        .replace("•", "-")
        .replace("\n\n", "\n")
        .replace("'", "")
        .replace("`", "")
    )
    text = re.sub("\n", " ", text)
    text = user_pattern.sub("", text)
    text = link_pattern_text.sub("", text)
    text = link_pattern.sub("", text)
    return text.strip()

In [135]:
message_data['text'] = message_data['text'].apply(lambda x: clean_message_text(x))

#### 3.4 Split Root messages and Threaded messages¶

In [136]:
thread_replies = message_data[message_data.parent_user_id.notnull()]
root_messages = message_data[message_data.parent_user_id.isnull()]

#### 3.5 Convert time columns from Epoch to Timestamp format with Pandas

In [137]:
import time
def epoch_2_datetime(epoch):
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(epoch))

In [138]:
thread_replies = thread_replies.astype({"ts": float , "thread_ts": float})

thread_replies['ts'] = thread_replies['ts'].apply(lambda x: epoch_2_datetime(x))
thread_replies['thread_ts'] = thread_replies['thread_ts'].apply(lambda x: epoch_2_datetime(x))

In [139]:
root_messages = root_messages.astype({"ts": float , "thread_ts": float})

root_messages['ts'] = root_messages['ts'].apply(lambda x: epoch_2_datetime(x))