# Event correlation issues analysis (Analytics/Schema2 Telemetry)

### Synchronize all folders from S3 bucket into the data/ dir.

Example:

- data/
  - 1709164800000/
    - gzip files
  - 1709251200000/
    - gzip files

### List all folders inside data/

In [None]:
import os

dataframes_root_path = "data"

files_dir = [
    f for f in os.listdir(dataframes_root_path) if os.path.isdir(os.path.join(dataframes_root_path, f))
]
print(files_dir)

### Generate all.jsonl files (per folder & global)

In [None]:
import os

cmd = f"rm -f ./data/all.jsonl"

for dir in files_dir:
    target_all_file = f"./data/{dir}/all.jsonl"
    cmd = f"rm -f {target_all_file}"
    os.system(cmd)

for dir in files_dir:
    target_dir = f"./data/{dir}"
    print(f"Processing dir: {target_dir}...")
    cmd = f"find {target_dir} -type f | parallel --bar 'zcat {{}} | jq -c \".\"' > {target_dir}/all.jsonl"
    os.system(cmd)
    cmd = f"cat {target_dir}/all.jsonl >> ./data/all.jsonl"
    os.system(cmd)


### Import & Initialize PySpark

In [None]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext(appName="Pi")
sc.getConf().set("spark.sql.debug.maxToStringFields", 1000)
sqlContext = pyspark.SQLContext(sc)

### Log every folder's timestamp, and timestamps from events contained (distinct)

In [None]:
from datetime import datetime
from pyspark.sql.functions import *

def process_dir(dir):
    target_all_file = f"./data/{dir}/all.jsonl"
    folder_date = datetime.fromtimestamp(int(dir) / 1000.0)
    dataframe_all = sqlContext.read.json(target_all_file)
    print("==============================================================")
    print(f"=====> {folder_date}")
    print(f"       COUNT = {dataframe_all.count()}")
    dataframe_all.select(date_format(col("timestamp"), "MM-dd-yyyy").alias("date_format")).distinct().show()
    print("==============================================================")

for dir in files_dir:
    process_dir(dir)


### Load all schema2 events from dataframes

In [None]:
from pyspark.sql.functions import col, array_size

target_all_file = f"./data/all.jsonl"
dataframe_all = sqlContext.read.json(target_all_file)

completions = dataframe_all.filter("event == 'Recommendation Generated'").alias("completions")
action_feedbacks = dataframe_all.filter("event == 'Recommendation Action'").alias("action_feedbacks")
product_feedbacks = dataframe_all.filter("event == 'Product Feedback'").alias("product_feedbacks")

total_events = dataframe_all.count()
print(f"Total Events: #{total_events}")
total_completions = completions.count()
print(f"Total Completions: #{total_completions}")
total_action_feedbacks = action_feedbacks.count()
print(f"Total Action Feedbacks: #{total_action_feedbacks}")
total_product_feedbacks = product_feedbacks.count()
print(f"Total Product Feedbacks: #{total_product_feedbacks}")
#computed_events_count = total_completions + total_action_feedbacks + total_product_feedbacks
#print(f"Computed Events: #{computed_events_count}")
print("")

### Figure out missing completion - feedback correlated events

In [None]:
from pyspark.sql.functions import col, array_size

completions_correlated_feedbacks = completions.join(action_feedbacks,
                                     on= col("completions.properties.suggestion_id") == col("action_feedbacks.properties.suggestion_id"),
                                     how="inner")
total_completions_correlated_feedbacks = completions_correlated_feedbacks.count()
print(f"Total correlated Completions-Feedback: #{total_completions_correlated_feedbacks}")
#completions_correlated_feedbacks.show()
#completions_correlated_feedbacks.select(["completions.properties.suggestion_id"]).show(truncate=False)

completions_missing_feedbacks = completions.join(action_feedbacks,
                                     on= col("completions.properties.suggestion_id") == col("action_feedbacks.properties.suggestion_id"),
                                     how="left_anti")
total_completions_missing_feedbacks = completions_missing_feedbacks.count()
print(f"Total Completions Missing Feedback: #{total_completions_missing_feedbacks}")
#completions_missing_feedbacks.show()
#completions_missing_feedbacks.select(["completions.properties.suggestion_id"]).show(truncate=False)

feedbacks_missing_completions = action_feedbacks.join(completions,
                                     on= col("completions.properties.suggestion_id") == col("action_feedbacks.properties.suggestion_id"),
                                     how="left_anti")
total_feedbacks_missing_completions = feedbacks_missing_completions.count()
print(f"Total Feedback Missing Completions: #{total_feedbacks_missing_completions}")
# feedbacks_missing_completions.show()

### Figure out which RH org-ids are sending completions, but no correlated feedback events

In [None]:
# Look at: userId, timestamp, originalTimestamp, receivedAt, sentAt, properties.suggestion_id
completions_missing_feedbacks_tests = completions.join(action_feedbacks,
                                     on= col("completions.properties.suggestion_id") == col("action_feedbacks.properties.suggestion_id"),
                                     how="left_anti")
total_completions_missing_feedbacks_tests = completions_missing_feedbacks_tests.count()
print(f"Total Completions Missing Feedback: #{total_completions_missing_feedbacks_tests}")

completions_missing_feedbacks_tests.groupBy(["completions.properties.rh_user_org_id"]).count().orderBy("count", ascending=False).show(n=20, truncate=False)


In [None]:
from datetime import datetime
from pyspark.sql.functions import *

dir = "1709164800000"
print("************* FOLDER :: 1709164800000 **************")
target_all_file = f"./data/{dir}/all.jsonl"
folder_date = datetime.fromtimestamp(int(dir) / 1000.0)
print(f"Folder date: {folder_date}")

dataframe_all = sqlContext.read.json(target_all_file)
completions = dataframe_all.filter("event == 'Recommendation Generated'").alias("completions")
action_feedbacks = dataframe_all.filter("event == 'Recommendation Action'").alias("action_feedbacks")

total_completions = completions.count()
print(f"Total Completions: #{total_completions}")
total_action_feedbacks = action_feedbacks.count()
print(f"Total Action Feedbacks: #{total_action_feedbacks}")

# https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html
completions_missing_feedbacks = completions.join(action_feedbacks,
                                     on= col("completions.properties.suggestion_id") == col("action_feedbacks.properties.suggestion_id"),
                                     how="left_anti")
total_completions_missing_feedbacks = completions_missing_feedbacks.count()
print(f"Total Completions Missing Feedback: #{total_completions_missing_feedbacks}")
#completions_missing_feedbacks.show()
#completions_missing_feedbacks.select(["completions.properties.suggestion_id"]).show(truncate=False)


completions_missing_feedbacks.groupBy(["completions.properties.rh_user_org_id"]).count().orderBy("count", ascending=False).show(n=20, truncate=False)
