# Data Analytics - Datathon 2024

## Libraries

In [0]:
# Loading Libraries for the notebook
from pyspark.sql.functions import *
import datetime
from pyspark.sql.window import Window

# UDFS

In [0]:

# Function to Calculate Distance in Kilometers in a straight line between two points.

'''
CAL_LAT_LONG_DIST(df,lat1,long1,lat2,long2):
df = dataframe when we want to add the columns of "distance_in_kms"
lat1 = Latitued of Place 1
long1 = Longitude of Place 1
lat 2 = Latitude of Place 2
long2 = Longitued of Place 2

Outputs: DataFrame with a new column named = "distance_in_kms". The number represent the distance in KMS between the two points for each row.
'''

def cal_lat_log_dist(df, lat1, long1, lat2, long2):
        df = df.withColumn('distance_in_kms' , \
            round((acos((sin(radians(col(lat1))) * sin(radians(col(lat2)))) + \
                   ((cos(radians(col(lat1))) * cos(radians(col(lat2)))) * \
                    (cos(radians(long1) - radians(long2))))
                       ) * lit(6371.0)), 4))
        return df

## Loading Data

In [0]:
# Loading Data from BRONZE database

GDELT_EVENTS = spark.sql("SELECT * FROM BRONZE.GDELT_EVENTS")
PORT_LOCATIONS_DIM = spark.sql("SELECT * FROM BRONZE.PORTS_DICTIONARY")
CAMEO_DICTIONARY = spark.sql("SELECT * FROM BRONZE.CAMEO_DICTIONARY")

## Cleaning PORT_LOCATIONS_DIM

In [0]:
# Claeaning RAW data from PORT_LOCATIONS

PORT_LOCATIONS_DIM_CLEANED = (
PORT_LOCATIONS_DIM
.filter("LATITUDE IS NOT NULL") #Filter for Latitud is nos null
.filter("LONGITUDE IS NOT NULL") #Filter for Longitud is nos null
.withColumn("LATITUDE", regexp_replace(col("LATITUDE")," ","")) #Eliminate black spaces in LATITUD column
.withColumn("LONGITUDE", regexp_replace(col("LONGITUDE")," ","")) #Eliminate black spaces in LATITUD column
.withColumn("Lat_Ori", substring(col("LATITUDE"),-1,1)) # Get N,S,W,E Orientation from latitud
.withColumn("Long_Ori", substring(col("LONGITUDE"),-1,1)) # Get N,S,W,E Orientation from longitude
.withColumn("LATITUDE_CORRECTED", #THIS NEW COLUMN CORRECT THE COORINDATES DEPENDING ON THE ORIENTATION N,S,W,E
            when(col("Lat_Ori") == 'S', expr("substring(LATITUDE,1,length(LATITUDE) - 1 )") * - 1) #GET CORRECT COORDINATES
            .when(col("Lat_Ori") == 'N', expr("substring(LATITUDE,1,length(LATITUDE) - 1 )")) #GET CORRECT COORDINATES
            .when(col("Lat_Ori") == 'E', expr("substring(LATITUDE,1,length(LATITUDE) - 1 )") * -1) #GET CORRECT COORDINATES
            .otherwise(999.999) # ID FOR CHECKING IF SOME VALUE ISN'T TAKEN INTO ACCOUNT
)
.withColumn("LONGITUDE_CORRECTED", #THIS NEW COLUMN CORRECT THE COORINDATES DEPENDING ON THE ORIENTATION N,S,W,E
            when(col("Long_Ori") == 'E', expr("substring(LONGITUDE,1,length(LONGITUDE) - 1 )")) #GET CORRECT COORDINATES
            .when(col("Long_Ori") == 'W', expr("substring(LONGITUDE,1,length(LONGITUDE) - 1 )") * -1)#GET CORRECT COORDINATES
            .when(col("Lat_Ori") == 'N', expr("substring(LATITUDE,1,length(LATITUDE) - 1 )") * -1) #GET CORRECT COORDINATES
            .otherwise(999.999) # ID FOR CHECKING IF SOME VALUE ISN'T TAKEN INTO ACCOUNT
)
.select("COUNTRY","PORT","LATITUDE_CORRECTED","LONGITUDE_CORRECTED") # SELECT COUNTRIES OF INTEREST
)

## DATA ANALYSIS FOR WHOLE COUNTRYS IN THE TRANSPACIFIC ROUTE:
* CANADA
* USA
* CHINA
* JAPON
* SOUTH KOREA
* TAIWAN
* VEITNAM
* HONG KONG

#### Dataset with Countries of Interest

In [0]:
GDELT_EVENTS_TPR = (GDELT_EVENTS
.filter(col("ActionGeo_CountryCode").isin("US","CA","VM","CH","JA","HK","KS")) # FILTER FOR COUNTRYS OF INTEREST
.join(CAMEO_DICTIONARY,col("EventCode") == col("CAMEO CODE"), "left") #GET NAME FOR EventRootCode
.filter("DESCRIPTION is not null") #NO NEWS WITH NO CLEAR DESCRIPTION
.withColumn("Date", to_date(col("Day").cast("string"), "yyyyMMdd")) # CREATE COLUMN OF DATE TYPE
.filter("Date >= '2023-01-01'")
.withColumn("YearWeek", weekofyear(col("Date"))) # GET NUMBER OF WEEk OF THE YEAR
.withColumn("MonthYearWeek", concat(col("MonthYear"),col("YearWeek"))) #GET DATE ID for MONTH,YEAR,WEEK
)

## Cleaning for non duplicate information

In [0]:
GDELT_EVENTS_TPR_UNIQUE_LINKS = (GDELT_EVENTS_TPR
.withColumn("NumNew", row_number().over(Window.partitionBy("SOURCEURL").orderBy("SOURCEURL")))
.filter("NumNew == 1")
.drop("NumNew")
)

In [0]:
#GDELT EVENTS POLITICAL

GDELT_EVENTS_TPR_NTI_POLITICAL = (
GDELT_EVENTS_TPR_UNIQUE_LINKS
.filter(
  (col("Actor1Type1Code").isin('GOV'))
  | (col("Actor2Type1Code").isin('GOV'))
)
)


In [0]:
display(
GDELT_EVENTS_TPR_NTI_POLITICAL
.filter("ActionGeo_CountryCode == 'CH'")
.filter("Date == '2023-04-04'"))

In [0]:
display(
GDELT_EVENTS_TPR_NTI_POLITICAL
.filter("ActionGeo_CountryCode == 'CH'")
.withColumn("PondGS", col("GoldsteinScale") * col("NumArticles"))
.groupBy("Date").agg(sum(col("PondGS")).alias("PondGS"),sum("NumArticles").alias("NumArticles"))
.withColumn("GSPonderado", col("PondGS") / col("NumArticles"))
)

Databricks visualization. Run in Databricks to view.

In [0]:
display(
GDELT_EVENTS_TPR_NTI_POLITICAL
.withColumn("GoldsteinLabel", when(col("GoldsteinScale") < 0,"Negative").otherwise("Positive"))
.withColumn("GoldsteinScaleAdjusted", when(col("GoldsteinScale") < 0 , col("GoldsteinScale") * -1).otherwise(col("GoldsteinScale")))
.filter("ActionGeo_CountryCode == 'CH'")
.groupBy("Date","GoldsteinLabel").agg(count("Date"),avg("GoldsteinScale"),max("GoldsteinScale"))
)

# Analsysis of GoldensteinScale depending type of event

The objective of this chunk of code is to categorize events root codes to make it clear with types of events belong more to the negative part of the goldenstein scale and which ones belong to the positive part. The output of the cell is a Dimensional Table that categorize the events based on the range of goldenstein scale. When the range is within negative numbers is consider a negative event. When the range is whithin positive numbers is consider a positive event. Events that thier range of Goldstein scale is within positive and negative numbers are descarted from the analysis as they could cause noise within the ranges.

In [0]:
GDELT_EVENTS_CATEGORIZATION_GS_DIM = (
GDELT_EVENTS_TPR_UNIQUE_LINKS #TABLE OF EVENTS WITH COUNTRIES OF INTERES AND NO DUPLICATES
.groupBy(col("DESCRIPTION"),col("EventCode")).agg(max("GoldsteinScale").alias("MaxGS"),min("GoldsteinScale").alias("MinGS")) #GROUPING FOR EVENT DESCRIPTION AND GET MAX AND MIN VALUES OF GS SCALE
.withColumn("EventCategory",  # COLUMN THAT CATEGORIZE THE EVENTS SCALE OF POSITVE,NEGATIVE OR NOT CLEAR CATEGORIES TYPES OF EVENTS
            when((col("MaxGS") < 0) & (col("MinGS") < 0), "Negative")
            .when((col("MaxGS") > 0) & (col("MinGS") >0), "Positive")
            .otherwise("Not Clear Category")
            )
.filter(col("EventCategory") != 'Not Clear Category') # FILTER ALL EVENTS WITH NO CLEAR CATEGORY AS THEY ARE NOT IMPORTANT
.select("EventCode","EventCategory") # SELECT COLUMNS OF INTERES
)

Databricks visualization. Run in Databricks to view.

### Data Filtering
Within this chunk of code we are going to separate the type of news that are occuring within each country. In the first cell we are selecting the news that have a clear category selected by the impact in Goldstein Scale. In the next cell we are filtering news within 9 actors Codes that may have an efect in our Business Objective.

In [0]:
GDELT_EVENTS_TPR_CATEGORIZED = (
GDELT_EVENTS_TPR_UNIQUE_LINKS
.join(GDELT_EVENTS_CATEGORIZATION_GS_DIM,"EventCode","left") #JOIN CATEGORIZATION TABLE
.filter("DESCRIPTION is not null") #NO NEWS WITH NO CLEAR DESCRIPTION OR WITH AN UNCLEAR CATEGORY
.withColumn("GoldsteinScaleLable", col("GoldsteinScale").astype("int")) #GET INTEGER NUMBER OF GDS
)

In [0]:
#GDELT EVENTS POLITICAL

GDELT_EVENTS_TPR_NTI_POLITICAL = (
GDELT_EVENTS_TPR_CATEGORIZED
.filter(
  (col("Actor1Type1Code").isin('GOV'))
  | (col("Actor2Type2Code").isin('GOV'))
)
)

%md
#### Finding Best Objective Variable

In [0]:
display(GDELT_EVENTS_TPR_NTI)

In [0]:
CANADA_TYPE_OF_MOST_POPULAR_EVENTS_PER_DATE = (
GDELT_EVENTS_TPR_NTI
.filter("ActionGeo_CountryCode == 'CH'")
.groupby("Date","").count()
.withColumn("Rank", row_number().over(Window.partitionBy("Date").orderBy(col("count").desc())))
.filter("Rank == 1")
)

display(CANADA_TYPE_OF_MOST_POPULAR_EVENTS_PER_DATE)

Databricks visualization. Run in Databricks to view.

In [0]:
display(
GDELT_EVENTS_TPR_NTI
.filter("ActionGeo_CountryCode == 'CH'")
.groupby("Date","DESCRIPTION").count()
.withColumn("Rank", row_number().over(Window.partitionBy("Date").orderBy(col("count").desc())))
)

In [0]:
display(
GDELT_EVENTS_TPR_NTI
.filter("ActionGeo_CountryCode == 'CA'")
.filter("Date == '2023-02-19'")
)

### Analysing Variables Related to Positive Events

The first thing is Generate a Base Table. This Base table will help us to determine an objective to start generating and analyzing variables and correlaitons. First we have to clarify our "Unit of Analysis" This units will infer in at wich level our data is aggregated. The unit analysis in this case is "Date", "ActionGeo_CountryCode", "GoldesteinScaleLabel" and the Value we are going to try to predict is the number of articles writed in each category for the Goldenstein Scale

In [0]:
GDELT_EVENTS_TPR_POSITIVE_BASE = (GDELT_EVENTS_TPR_POSITIVE
.groupBy("Date","ActionGeo_CountryCode").agg(avg("GoldsteinScale").alias("GoldsteinScale"))
)

GDELT_EVENTS_TPR_NEGATIVE_BASE = (GDELT_EVENTS_TPR_NEGATIVE
.groupBy("Date","ActionGeo_CountryCode").agg(avg("GoldsteinScale").alias("GoldsteinScale"))
)

display(GDELT_EVENTS_TPR_POSITIVE_BASE)

Databricks visualization. Run in Databricks to view.

In [0]:
display(GDELT_EVENTS_TPR_POSITIVE_BASE)

In [0]:
display(PORT_LOCATIONS_DIM_CLEANED)

In [0]:
GDELT_EVENTS_INTEREST = (
GDELT_EVENTS_DATE
.filter("ActionGeo_Lat is not null")
.filter("ActionGeo_Long is not null")
.select("ActionGeo_Fullname","ActionGeo_CountryCode","ActionGeo_Lat","ActionGeo_Long","SOURCEURL")
)

In [0]:
display(GDELT_EVENTS_INTEREST)

In [0]:
GDELT_CROSS_PORT = (GDELT_EVENTS_INTEREST.filter("ActionGeo_CountryCode == 'CH'")
.crossJoin(PORT_LOCATIONS_DIM_CLEANED.filter("COUNTRY == 'CHINA'"))
)

GDELT_CROSS_PORT_DISTANCE = cal_lat_log_dist(GDELT_CROSS_PORT, 'ActionGeo_Lat', 'ActionGeo_Long', 'LATITUDE_CORRECTED', 'LONGITUDE_CORRECTED')

display(GDELT_CROSS_PORT_DISTANCE)

## DA

In [0]:
GDELT_EVENTS_BASE =

In [0]:
GDELT_EVENTS_DATE.select("GlobalEventID").distinct().count()

In [0]:
display(GDELT_EVENTS_DATE)

In [0]:
GDELT_EVENTS_PORTS = (GDELT_EVENTS_DATE
#.filter(col("ActionGeo_Fullname").like("% Port %"))
#.filter(col("SOURCEURL").like("%-port-%"))
#)

#print("Number of observations with the name a location with name port in it: ",GDELT_EVENTS_PORTS.count())

In [0]:
PLACE_CUADCLASSCOUNT = (GDELT_EVENTS_DATE
.filter(col("ActionGeo_Fullname").like("% Port %"))
.groupby("ActionGeo_ADM1Code","Date_Week").pivot("QuadClass").count()
)

GLDSCALE = GDELT_EVENTS_DATE.groupby("ActionGeo_ADM1Code","Date_Week").agg(avg("GoldsteinScale").alias("GoldsteinScale"))

ADDEDE = PLACE_CUADCLASSCOUNT.join(GLDSCALE,["ActionGeo_ADM1Code","Date_Week"],"left").fillna(0)

In [0]:
max_value = greatest(*[col(c) for c in ADDEDE.select("1","2","3","4").columns])

df_with_max_column = ADDEDE.withColumn(
    "Max_QuadClass",
      when(col("1") == max_value, "1")
     .when(col("2") == max_value, "2")
     .when(col("3") == max_value, "3")
     .when(col("4") == max_value, "4")
)

Databricks visualization. Run in Databricks to view.

In [0]:
display(df_with_max_column)

Databricks visualization. Run in Databricks to view.