In [1]:

from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import split, explode,to_date, regexp_extract, col, upper

import shutil
import gc

spark = SparkSession.builder \
    .appName("refine_gkg_counts") \
    .config("spark.driver.memory", "20g")\
    .getOrCreate()


base_dir = '/home/oscar/budasbi-repos/factored-datathon-2024-voyager/bucket_contents/raw/events'
parquet_path = '/home/oscar/budasbi-repos/factored-datathon-2024-voyager/parquet/events'
if os.path.exists(parquet_path):
    shutil.rmtree(parquet_path)
    os.makedirs(parquet_path)


24/08/20 17:34:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
spark.sparkContext

In [3]:
def cleaned_to_parquet(csv_filepath):
    print(csv_filepath)
    events = spark.read.csv(csv_filepath, header=False, sep='\t',inferSchema=True)
    #Set column names
    event_columns = ["global_event_id", "sqldate", "month_year", "year", "fraction_date", "actor_1_code", "actor_1_name", "actor_1_country_Code", "actor_1_known_group_code", "actor_1_ethnic_code", 
                    "actor_1_religion_code", "actor_1_religion_2_code", "actor_1_type_code", "actor_1_type_2_code", "actor_1_type_3_code", "actor_2_code", "actor_name", 
                    "actor_2_country_code", "actor_2_known_group_code", "actor_2_ethnic_code", "actor_2_religion_1_code", "actor_2_religion_2_code", 
                    "actor_2_type_1_code", "actor_2_type_2_code", "actor_2_type_3_code", "is_root_event", "event_code", "event_base_code", "event_root_code", 
                    "quad_class", "goldstein_scale", "num_mentions", "num_sources", "num_articles", "avg_tone", "actor_1_geo_type", "actor_1_geo_fullname", "actor_1_geo_country_code", 
                    "actor_1_geo_adm1_code", "actor_1_geo_lat", "actor_1_geo_long", "actor_1_geo_feature_id", "actor_2_geo_type", "actor_2_geo_fullname", "actor_2_geo_country_code", 
                    "actor_2_geo_adm1_code", "actor_2_geo_lat", "actor_2_geo_long", "actor_2_geo_feature_id", "action_geo_type", "action_geo_fullname", "action_geo_country_code", 
                    "action_geo_adm1_code", "action_geo_lat", "action_geo_long", "action_geo_feature_id", "date_added", "source_url"]
    event_w_columns = events.toDF(*event_columns)
    #Cast date columns as date
    event_w_to_date = event_w_columns.withColumn('date_added', to_date('date_added', 'yyyyMMdd'))
    event_w_to_date = event_w_to_date.withColumn('sqldate', to_date('sqldate', 'yyyyMMdd'))

    #To check samples of data for all columns
    # for column in event_columns:
    #     print(column)
    #     event_w_to_date.select(column).distinct().orderBy(column, ascending = True).show(5, truncate =False)
    #     event_w_to_date.select(column).distinct().orderBy(column, ascending = False).show(5, truncate =False)
        

    #Set string columns Uppercase
    uppercase_columns=['actor_1_code','actor_2_code','actor_1_name','actor_1_country_Code','actor_1_known_group_code','actor_1_ethnic_code','actor_1_religion_code','actor_1_religion_2_code','actor_1_religion_2_code','actor_1_type_code','actor_1_type_2_code','actor_1_type_3_code','actor_2_code','actor_name','actor_2_country_code','actor_2_known_group_code','actor_2_ethnic_code','actor_2_religion_1_code','actor_2_religion_2_code','actor_2_type_1_code','actor_2_type_2_code','actor_2_type_3_code','actor_1_geo_country_code','actor_1_geo_adm1_code','actor_1_geo_feature_id','actor_2_geo_country_code','actor_2_geo_adm1_code']
    for column in uppercase_columns:
        event_w_to_date = event_w_to_date.withColumn(column, upper(event_w_to_date[column]))
    #Split columns in numeric and text values
    numeric_pattern = "^[0-9]+$"
    text_pattern = "^[^0-9]+$"
    for column in ['actor_1_geo_feature_id','actor_2_geo_feature_id']:
        event_w_to_date = event_w_to_date.withColumn(f"{column}_num", regexp_extract(col(column), numeric_pattern, 0)) \
                                        .withColumn(f"{column}_text", regexp_extract(col(column), text_pattern, 0))
    #Remove epmty strings
    df_null_replaced = event_w_to_date.na.replace("", None)
    
    event_w_columns.unpersist()
    event_w_to_date.unpersist()
    events.unpersist()
        
    
    df_null_replaced.write\
    .mode('append')\
    .format('parquet')\
    .option('compression', 'snappy')\
    .partitionBy('date')\
    .save(parquet_path)
    
    df_null_replaced.unpersist()
    gc.collect()

In [4]:
event_files = []
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith(".CSV"):
            event_files.append(os.path.join(root, file))


In [5]:
if event_files:
    # events = spark.read.csv(event_files, header=False, sep='\t',inferSchema=True)
    for file in event_files:
        cleaned_to_parquet(file)
        break
else:
    print("We didn't find any CSV file")


/home/oscar/budasbi-repos/factored-datathon-2024-voyager/bucket_contents/raw/events/year=2024/month=08/day=05/20240805.export.CSV


24/08/20 17:34:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                