In [None]:
%load_ext autoreload
%autoreload 2

import sys

paths_to_add = ['/home/jovyan/work']

for p in paths_to_add:
    if p not in sys.path:
        sys.path.append(p)

print(sys.path)
native_spark = True

import pandas as pd
from ams.services import spark_service

import findspark


from pyspark.sql import SparkSession, udf
from pyspark.sql.functions import udf, struct
from ams.services import twitter_service, file_services
from pyspark.sql import functions as F
from pathlib import Path
from pyspark.sql.types import StringType, StructType, StructField, BooleanType, MapType, ArrayType, Row
import json
from typing import Dict, List
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode
from pyspark.sql.types import IntegerType
import time

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [None]:
findspark.init()
spark = spark_service.get_or_create(app_name='twitter_flatten')
sc = spark.sparkContext
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")

In [None]:
from ams.config import constants

if native_spark:
    project_root = "../../../"
    data_path = Path(constants.DATA_PATH)
else:
    data_path = Path('/home/jovyan/work/data/')
    project_root = "/home/jovyan/work/"
    
twitter_folder = 'twitter'

file_path = Path(data_path, twitter_folder, 'fixed_drop', 'main')

In [None]:
from ams.services import schema_service

sample_tweet_path = Path(project_root, "resources/sample_tweet.json")
tweet_schema = schema_service.get_twitter_schema(spark=spark, twitter_sample_path=sample_tweet_path)

In [None]:
import re

entity_comma = '&#44;'
line_ending_pattern = re.compile("[\r\n]")
def clean_text(text:str):
    result = text
    if text is not None and len(text) > 0:
        result = re.sub(line_ending_pattern, '', text)
        result = re.sub(",", entity_comma, result)
    return result
clean_text_udf = udf(clean_text, StringType())

def get_cashtag_info(ticker: str, has_cashtag: bool, ticker_in_text: bool) -> Dict:
    return {"ticker": ticker, "has_cashtag": has_cashtag, "ticker_in_text": ticker_in_text}


In [None]:
search_tuples = twitter_service.get_ticker_searchable_tuples()

print(f'number of search tuples: {len(search_tuples)}')

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType

def fix_columns(df: DataFrame):
    
    sel_columns = ['created_at',
    'id',
    'text',
    'truncated',
    'source',
    'in_reply_to_status_id',
    'in_reply_to_user_id',
    'in_reply_to_screen_name',
    'contributors',
    'is_quote_status',
    'retweet_count',
    'favorite_count',
    'retweeted',
    'possibly_sensitive',
    'lang',
    F.col('entities.user_mentions')[0].alias('entities_user_mentions_0').cast(StringType()),
    F.col('entities.user_mentions')[1].alias('entities_user_mentions_1').cast(StringType()),
    F.col('entities.user_mentions')[2].alias('entities_user_mentions_2').cast(StringType()),
    F.col('entities.user_mentions')[3].alias('entities_user_mentions_3').cast(StringType()),
    F.col('entities.urls')[0].alias('entities_urls_0').cast(StringType()),
    F.col('entities.urls')[1].alias('entities_urls_1').cast(StringType()),
    F.col('entities.urls')[2].alias('entities_urls_2').cast(StringType()),
    F.col('entities.urls')[3].alias('entities_urls_3').cast(StringType()),
    F.col('metadata.iso_language_code').alias('metadata_iso_language_code'),
    F.col('metadata.result_type').alias('metadata_result_type'),
    F.col('user.id').alias('user_id'),
    F.col('user.name').alias('user_name'),
    F.col('user.screen_name').alias('user_screen_name'),
    F.col('user.location').alias('user_location'),
    F.col('user.description').alias('user_description'),
    F.col('user.url').alias('user_url'),
    F.col('user.protected').alias('user_protected'),
    F.col('user.followers_count').alias('user_followers_count').cast(IntegerType()),
    F.col('user.friends_count').alias('user_friends_count').cast(IntegerType()),
    F.col('user.listed_count').alias('user_listed_count'),
    F.col('user.created_at').alias('user_created_at'),
    F.col('user.favourites_count').alias('user_favourites_count').cast(IntegerType()),
    F.col('user.utc_offset').alias('user_utc_offset'),
    F.col('user.time_zone').alias('user_time_zone'),
    F.col('user.geo_enabled').alias('user_geo_enabled'),
    F.col('user.verified').alias('user_verified'),
    F.col('user.statuses_count').alias('user_statuses_count').cast(IntegerType()),
    F.col('user.lang').alias('user_lang'),
    F.col('user.contributors_enabled').alias('user_contributors_enabled'),
    F.col('user.is_translator').alias('user_is_translator'),
    F.col('user.is_translation_enabled').alias('user_is_translation_enabled'),
    F.col('user.profile_background_color').alias('user_profile_background_color'),
    F.col('user.profile_background_image_url').alias('user_profile_background_image_url'),
    F.col('user.profile_background_image_url_https').alias('user_profile_background_image_url_https'),
    F.col('user.profile_background_tile').alias('user_profile_background_tile'),
    F.col('user.profile_image_url').alias('user_profile_image_url'),
    F.col('user.profile_image_url_https').alias('user_profile_image_url_https'),
    F.col('user.profile_banner_url').alias('user_profile_banner_url'),
    F.col('user.profile_link_color').alias('user_profile_link_color'),
    F.col('user.profile_sidebar_border_color').alias('user_profile_sidebar_border_color'),
    F.col('user.profile_sidebar_fill_color').alias('user_profile_sidebar_fill_color'),
    F.col('user.profile_text_color').alias('user_profile_text_color'),
    F.col('user.profile_use_background_image').alias('user_profile_use_background_image'),
    F.col('user.has_extended_profile').alias('user_has_extended_profile'),
    F.col('user.default_profile').alias('user_default_profile'),
    F.col('user.default_profile_image').alias('user_default_profile_image'),
    F.col('user.following').alias('user_following'),
    F.col('user.follow_request_sent').alias('user_follow_request_sent'),
    F.col('user.notifications').alias('user_notifications'),
    F.col('user.translator_type').alias('user_translator_type'),
    F.col('f22_place.place_country').alias('place_country').cast(StringType()),
    F.col('f22_place.place_full_name').alias('place_full_name').cast(StringType()),
    F.col('f22_place.place_name').alias('place_name').cast(StringType())
    ]

    df = df.select(*sel_columns)
    return df.drop(*['user', 'metadata', 'entities', 'f22_place'])

In [None]:
def clean_columns(df: DataFrame):
    return df.withColumn("text", clean_text_udf(F.col("text")))\
        .withColumn("user_name", clean_text_udf(F.col("user_name")))\
        .withColumn("user_screen_name", clean_text_udf(F.col("user_screen_name")))\
        .withColumn("user_location", clean_text_udf(F.col("user_location")))\
        .withColumn("user_description", clean_text_udf(F.col("user_description")))\
        .withColumn("entities_user_mentions_0", clean_text_udf(F.col("entities_user_mentions_0")))\
        .withColumn("entities_user_mentions_1", clean_text_udf(F.col("entities_user_mentions_1")))\
        .withColumn("entities_user_mentions_2", clean_text_udf(F.col("entities_user_mentions_2")))\
        .withColumn("entities_user_mentions_3", clean_text_udf(F.col("entities_user_mentions_3")))\
        .withColumn("entities_urls_0", clean_text_udf(F.col("entities_urls_0")))\
        .withColumn("entities_urls_1", clean_text_udf(F.col("entities_urls_1")))\
        .withColumn("entities_urls_2", clean_text_udf(F.col("entities_urls_2")))\
        .withColumn("entities_urls_3", clean_text_udf(F.col("entities_urls_3")))\
        .withColumn("place_name", clean_text_udf(F.col("place_name")))\
        .withColumn("user_url", clean_text_udf(F.col("user_url")))\
        .withColumn("user_profile_background_image_url", clean_text_udf(F.col("user_profile_background_image_url")))\
        .withColumn("source", clean_text_udf(F.col("source")))\
        .withColumn("in_reply_to_screen_name", clean_text_udf(F.col("in_reply_to_screen_name")))\
        .withColumn("place_country", clean_text_udf(F.col("place_country")))\
        .dropDuplicates(['id'])

In [None]:
search_tuples = twitter_service.get_ticker_searchable_tuples()

In [None]:
def get_cashtags_row_wise(row: Row):
    cashtags_stock = []
    
    row_dict = row.asDict()
    all_thing = ''
    
    text = ''
    for k in row_dict.keys():
        if k.endswith('_lc'):
            if k == 'text_lc':
                text = row_dict[k]
                if text is None:
                    text = ''
                text_len = len(str(text))
            else:
                cell = row_dict[k]
                cell = '' if cell is None else cell
                
                if type(cell) != 'str':
                    cell = str(cell)
                    
                if cell is None:
                    cell = ''
                all_thing += cell 
    all_thing = text + all_thing
            
    for s in search_tuples:
        ticker = s[0]
        ticker_lc = ticker.lower()
        name_lc = s[1].lower()
        
        index = all_thing.find(f'${ticker_lc}')
        if index > -1:
            ticker_in_text = True if index < text_len else False
            cashtags_stock.append(get_cashtag_info(ticker=ticker, has_cashtag=True, ticker_in_text=ticker_in_text))
        else:
            index_ticker = all_thing.find(ticker_lc)
            index_name = all_thing.find(name_lc)
            
            if index_ticker > -1 and index_name > -1:
                ticker_in_text = True if index_ticker < text_len else False
                cashtags_stock.append(get_cashtag_info(ticker=ticker, has_cashtag=False, ticker_in_text=ticker_in_text))
                
        num_other_tickers = len(cashtags_stock) - 1
        for tag in cashtags_stock:
            tag['num_other_tickers_in_tweet'] = num_other_tickers
    
    return cashtags_stock

In [None]:
def find_tickers_and_explode(df: DataFrame):
    
    columns_to_search = ['text', 'source', 'entities_user_mentions_0', 'entities_user_mentions_1', 'entities_user_mentions_2', 'entities_user_mentions_3', 'entities_urls_0', 'entities_urls_1', 'entities_urls_2', 'entities_urls_3', 'user_description', 'user_url']
    
    lc_cols = []
    for c in columns_to_search:
        lc_cols.append(f'{c}_lc')
        df = df.withColumn(f'{c}_lc', F.lower(F.col(c)))

          
    schema = ArrayType(StructType(fields=[StructField('ticker', StringType()),
                                          StructField('has_cashtag', BooleanType()),
                                          StructField('ticker_in_text', BooleanType()),
                                          StructField('num_other_tickers_in_tweet', IntegerType())
                                         ]))
    get_cashtags_row_wise_udf = udf(get_cashtags_row_wise, schema)

    df = df.withColumn("f22", get_cashtags_row_wise_udf((struct([df[x] for x in df.columns]))))

    df = df.withColumn('f22', explode(F.col('f22')))

    se_columns = list(set(df.columns) - set(lc_cols)) + [F.col('f22.ticker').alias('f22_ticker'),
                                            F.col('f22.has_cashtag').alias('f22_has_cashtag'),
                                            F.col('f22.ticker_in_text').alias('f22_ticker_in_text'),
                                            F.col('f22.num_other_tickers_in_tweet').alias('f22_num_other_tickers_in_tweet')
                                           ]

    return df.select(*se_columns).drop('f22')

In [None]:
from ams.services import dataframe_services 
from ams.services.dataframe_services import PersistedDataFrameTypes
from retry import retry

flat_drop_path = Path(data_path, twitter_folder, 'flattened_drop', "main")
prefix = "tweets_flat"

file_type = PersistedDataFrameTypes.PARQUET

@retry(tries=3)
def persist(df: DataFrame):
    dataframe_services.persist_dataframe(df=df, 
                                         output_drop_folder_path=flat_drop_path, 
                                         prefix=prefix, 
                                         num_output_files=20,
                                         file_type=file_type)

In [None]:
def clean_unexpected_null_column(place:str):
    result = {"place_country": None, "place_full_name": None, "place_name": None}
    if place is not None:
        result["place_country"] = place["country"]
        result["place_full_name"] = place["full_name"]
        result["place_name"] = place["name"]
    return result

schema = StructType(fields=[StructField('place_country', StringType()),
                                          StructField('place_full_name', StringType()),
                                          StructField('place_name', StringType()),
                                         ])

clean_unexpected_null_column_udf = udf(clean_unexpected_null_column, schema)

def clean_place(df: DataFrame):
    
    df = df.withColumn("f22_place", clean_unexpected_null_column_udf(F.col("place")))
    df = df.drop("place")

    return df

In [None]:
%%time

from ams.services import file_services

files = file_services.list_files(parent_path=file_path, ends_with=".txt")
files = [str(f) for f in files]

def chunk(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
        
chunked_list = list(chunk(files, 3))

tot_chunks = len(chunked_list)

for ndx, chunk in enumerate(chunked_list):
#     if ndx + 1 < 22: 
#         continue
        
    print(f"Processing {ndx + 1} of {tot_chunks}.")
    
    print(f"Processing files: {chunk}")
    
    df_init = spark.read.json(chunk[0])

    df_unduped = df_init.dropDuplicates(['id'])

    df_clean_place = clean_place(df=df_unduped)
        
    df_thin = fix_columns(df=df_clean_place)
    
    df_clean = clean_columns(df=df_thin)
    
    df_tickered = find_tickers_and_explode(df=df_clean)
        
    persist(df=df_tickered)
#     persist(df=df_init)