# Abstracted Ingestion

Notebook to be called via Job for ingestion from Steam, Google Play, etc.

For testing/exploration, use the `Abstracted_Ingestion_testing` notebook instead.

## Setup

In [0]:
%pip install google-play-scraper praw

In [0]:
%restart_python

In [0]:
dbutils.widgets.text("source_content_id", "com.nianticlabs.pokemongo", "Source Content ID")
dbutils.widgets.text("game_name", "Pokemon Go", "Game Name")
dbutils.widgets.text("catalog", "main", "Catalog")
dbutils.widgets.text("schema", "social_listening", "Schema")

dbutils.widgets.dropdown(
  name='content_type',
  defaultValue='Google Play Review',
  choices=['Steam Review', 'Google Play Review', 'Reddit Comment'],
  label='Content Type'
)

dbutils.widgets.dropdown(
  name='update_type',
  defaultValue='NEW_GAME',
  choices=['NEW_GAME', 'REFRESH'],
  label='Update Type'
)


In [0]:
SOURCE_CONTENT_ID = dbutils.widgets.get("source_content_id")
CONTENT_TYPE = dbutils.widgets.get("content_type")
GAME_NAME = dbutils.widgets.get("game_name")
UPDATE_TYPE = dbutils.widgets.get("update_type")
CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")

BRONZE_TABLE_PATH = f"{CATALOG}.{SCHEMA}.feedback_content_bronze"

print("Parameter values:")
print(f"SOURCE_CONTENT_ID: {SOURCE_CONTENT_ID}")
print(f"CONTENT_TYPE: {CONTENT_TYPE}")
print(f"GAME_NAME: {GAME_NAME}")
print(f"UPDATE_TYPE: {UPDATE_TYPE}")
print(f"CATALOG: {CATALOG}")
print(f"SCHEMA: {SCHEMA}")
print(f"BRONZE_TABLE_PATH: {BRONZE_TABLE_PATH}")


In [0]:
from ingestion_utils import DataIngestor, SteamIngestor, GooglePlayIngestor, RedditIngestor

## Call Corresponding API

In [0]:
import yaml

output_df = None
if CONTENT_TYPE == "Steam Review":
    steam_ingestor = SteamIngestor(spark)
    output_df = steam_ingestor.ingest(app_id=SOURCE_CONTENT_ID, game_name=GAME_NAME)

elif CONTENT_TYPE == "Google Play Review":
    google_play_ingestor = GooglePlayIngestor(spark)
    output_df = google_play_ingestor.ingest(app_id=SOURCE_CONTENT_ID, game_name=GAME_NAME)

elif CONTENT_TYPE == "Reddit Comment":
    # Get scope and secret names from config file
    with open('config/config.yaml', 'r') as config_file:
        # Load the YAML content using safe_load for security
        config_data = yaml.safe_load(config_file)

    scope_name = config_data['secrets']['scope_name']
    reddit_secret_keys = config_data['secrets']['keys']['reddit']
    reddit_client_id_secret_key = reddit_secret_keys['client_id']
    reddit_client_secret_secret_key = reddit_secret_keys['client_secret']
    reddit_user_agent_secret_key = reddit_secret_keys['user_agent']

    client_id = dbutils.secrets.get(scope=scope_name, key=reddit_client_id_secret_key)
    client_secret = dbutils.secrets.get(scope=scope_name, key=reddit_client_secret_secret_key)
    user_agent = dbutils.secrets.get(scope=scope_name, key=reddit_user_agent_secret_key)

    reddit_ingestor = RedditIngestor(spark, client_id=client_id, client_secret=client_secret, user_agent=user_agent)
    output_df = reddit_ingestor.ingest(subreddit_name=SOURCE_CONTENT_ID)

else:
    raise Exception(f"Invalid content type: {CONTENT_TYPE}")

print(f"New rows from API: {output_df.count()}")
print(f"Distinct rows: {output_df.select("content_id").distinct().count()}")

## Update UC Table

In [0]:
from delta.tables import DeltaTable

# Add to table with de-duplication using MERGE
# Note: assuming no duplicates in the new incoming dataframe.
# (If this is not the case, use df.dropDuplicates() first)
if output_df:
    # Create table if it doesn't exist
    if not spark.catalog.tableExists(BRONZE_TABLE_PATH):
        output_df.write.saveAsTable(BRONZE_TABLE_PATH)
    
    delta_table = DeltaTable.forName(spark, BRONZE_TABLE_PATH)

    delta_table.alias("existing") \
    .merge(
        source=output_df.alias("new"),
        condition="existing.content_id = new.content_id"
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

    # t = spark.read.table(BRONZE_TABLE_PATH)
    # t.display()