In [1]:
from pyspark.sql.types import IntegerType
%load_ext autoreload
%autoreload 2

import pyspark
from delta import *

import sys

sys.path.append("..")  # Adjust path if needed
from dags.lib.IncrementalLoader import IncrementalLoader
from dags.lib.Processer import *


from pyspark.sql.functions import col, lit, max as spark_max, length, row_number, explode

print("###############")
import sys
print(sys.executable)  # Should print Anaconda path
print(sys.version)     # Should print 3.11.x

###############
c:\Users\josub\.conda\envs\spark311\python.exe
3.11.11 | packaged by Anaconda, Inc. | (main, Dec 11 2024, 16:34:19) [MSC v.1929 64 bit (AMD64)]


In [2]:
def create_spark_session():
    conf = (
        pyspark.conf.SparkConf()
        .setAppName("LetsTalk")
        .set(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
        .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
        .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", r"C:\Users\josub\Desktop\BDMA\Barcelona\Big_Data_Management\SmallProject\LetsTalk\gcs.json")
        .set("spark.sql.shuffle.partitions", "4")
        .set("spark.jars", "../gcs-connector-hadoop.jar")
        .setMaster(
            "local[*]"
        )
    )


    builder = pyspark.sql.SparkSession.builder.appName("LetsTalk").config(conf=conf)
    spark = configure_spark_with_delta_pip(builder).getOrCreate()
    return spark


def create_spark_local_session():
    builder = pyspark.sql.SparkSession.builder.appName("LetsTalk") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

    spark = configure_spark_with_delta_pip(builder).getOrCreate()
    return spark



In [3]:
spark = create_spark_local_session()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [10]:
import os
control_table_path = r"C:\Users\josub\Desktop\BDMA\Barcelona\Big_Data_Management\SmallProject\LetsTalk\data\letstalk_landing_zone_bdma\control_table"
print(f"Path exists: {os.path.exists(control_table_path)}")
print(f"Path contents: {os.listdir(control_table_path) if os.path.exists(control_table_path) else 'Path does not exist'}")

Path exists: True
Path contents: ['.part-00000-7543ab6e-2dec-41f0-89ec-e6a6923e6ce8-c000.snappy.parquet.crc', 'part-00000-7543ab6e-2dec-41f0-89ec-e6a6923e6ce8-c000.snappy.parquet', '_delta_log']


In [4]:
def get_control_table_schema():
    return StructType([
        StructField("source_table", StringType(), False),
        StructField("last_processed_version", LongType(), False),
        StructField("last_run_ts", TimestampType(), True)
    ])

control_table_path = r"C:\Users\josub\Desktop\BDMA\Barcelona\Big_Data_Management\SmallProject\LetsTalk\data\control_table"

empty_df = spark.createDataFrame([], get_control_table_schema())
empty_df.write.format("delta").save(control_table_path)
spark.read.format("delta").load(control_table_path)


source_table,last_processed_version,last_run_ts


In [6]:
absolute_path_to_landing = r'C:\Users\josub\Desktop\BDMA\Barcelona\Big_Data_Management\SmallProject\LetsTalk\data\letstalk_landing_zone_bdma'
table_subpath = 'delta_news/entertainment'
loader = IncrementalLoader(spark, absolute_path_to_landing, table_subpath)
df = loader.get_new_data()
df.head(5)

CDF not available — doing full load


[Row(author='Steven J. Horowitz', content='Beyoncé kicked off her highly anticipated “Cowboy Carter” tour at Los Angeles’ SoFi Stadium tonight, giving the first look at her 32-date trek spanning nine global cities.\r\nThe show is the first tast… [+1848 chars]', description="Beyoncé kicked off her 'Cowboy Carter' tour at Los Angeles' SoFi Stadium on Apr. 28.", publishedAt='2025-04-29T03:30:00Z', source=Row(id=None, name='Variety'), title='Beyoncé Set List: Every Song Played on ‘Cowboy Carter’ Tour Opener in Los Angeles (UPDATING LIVE) - Variety', url='https://variety.com/2025/music/news/beyonce-set-list-every-song-played-cowboy-carter-tour-1236380629/', urlToImage='https://variety.com/wp-content/uploads/2025/04/GettyImages-2191421242.jpg?w=1000&h=563&crop=1'),
 Row(author='Anthony Nash', content='Pat McAfee is officially ready to return to the ring, and will be doing it at WWE Backlash against a very big foe.\r\nDuring tonights episode of WWE RAW, McAfee addressed the crowd about Gunthe

In [90]:
processor = NewsProcessor(spark, df)

In [91]:
processor.ensure_schema()
processor.remove_clear_duplicates()
processor.name_to_id()
processor.remove_hidden_duplicates(['url'], ['publishedAt'])
processor.normalize_text(['title', 'description', 'content'])
processor.expand_source()
processor.order_by('publishedAt', ascending=False)

INFO:root:Removed 2 simple duplicate(s)
INFO:root:Removed 0 hidden duplicate(s)


In [92]:
processor.df

url,author,content,description,publishedAt,source,title,urlToImage
http://deadline.c...,Natalie Oganesyan,the rock amp ro...,the rock roll h...,2025-04-28 03:33:00,deadline,outkast cyndi la...,https://deadline....
http://www.vultur...,Sara Holdren,the new musical r...,real women have ...,2025-04-28 03:30:23,vulture,bright and bold i...,https://pyxis.nym...
https://deadline....,Glenn Garner,the american film...,stars like elle f...,2025-04-27 04:20:01,deadline,afi life achievem...,https://deadline....
https://www.yourt...,Kate Rose,the love horoscop...,sunday s new moon...,2025-04-27 03:03:23,yourtango,love horoscopes a...,https://www.yourt...
https://www.count...,Anna Logan,did you know gigi...,from hollywood to...,2025-04-26 15:17:46,countryliving.com,celebrities se...,https://hips.hear...
https://www.slash...,Kieran Fisher,the original sno...,how the heck did ...,2025-04-26 15:00:00,/film,the rachel zegler...,https://www.slash...
https://tvline.co...,Matt Webb Mitovich,the following con...,fire country s xl...,2025-04-26 05:19:51,tvline,fire country shoc...,https://tvline.co...
https://tvline.co...,Matt Webb Mitovich,the following con...,ncis sydney sho...,2025-04-26 03:00:00,tvline,ncis sydney boss...,https://tvline.co...
https://www.wwno....,Joseph King,the first weekend...,lil wayne will cl...,2025-04-26 00:30:00,wwno.org,lil wayne s new o...,https://npr.brigh...
http://deadline.c...,Anthony D'Alessandro,there are many qu...,george lucas the ...,2025-04-26 00:08:00,deadline,george lucas on w...,https://deadline....


In [82]:
import os
absolute_path_to_trusted = '/Users/alfio/projects/upc/BDMP2/data/letstalk_trusted_zone_bdma'
save_path = os.path.join(absolute_path_to_trusted, table_subpath)

processor.df = processor.df.withColumn(
    "url",
    when(col('author') == "Sara Holdren", "https:test.com").otherwise(col("url"))
)

In [84]:
processor.merge_with_trusted(save_path, ['url'])

INFO:root:Saving unique records from overlapping ones
INFO:root:Added new 1 unique records
INFO:root:Appending non overlapping records
INFO:root:Adding new 0 records


In [319]:
absolute_path_to_landing = '/Users/alfio/projects/upc/BDMP2/data/letstalk_landing_zone_bdma'
table_subpath = 'delta_sports/leagues'
loader = IncrementalLoader(spark, absolute_path_to_landing, table_subpath)
df = loader.get_new_data()
df.head(5)

CDF not available — doing full load


[Row(country=Row(code=None, flag=None, name='World'), league=Row(id=4, logo='https://media.api-sports.io/football/leagues/4.png', name='Euro Championship', type='Cup'), seasons=[Row(coverage=Row(fixtures=Row(events=True, lineups=True, statistics_fixtures=False, statistics_players=False), injuries=False, odds=False, players=True, predictions=True, standings=False, top_assists=True, top_cards=True, top_scorers=True), current=False, end='2008-06-29', start='2008-06-07', year=2008), Row(coverage=Row(fixtures=Row(events=True, lineups=True, statistics_fixtures=False, statistics_players=False), injuries=False, odds=False, players=True, predictions=True, standings=False, top_assists=True, top_cards=True, top_scorers=True), current=False, end='2012-07-01', start='2012-06-08', year=2012), Row(coverage=Row(fixtures=Row(events=True, lineups=True, statistics_fixtures=True, statistics_players=True), injuries=False, odds=False, players=True, predictions=True, standings=True, top_assists=True, top_car

In [320]:
sporcessor = SportsProcessor(spark, df)
countries = sporcessor.generate_countries()
leagues = sporcessor.generate_leagues()
sporcessor.expand()

 Changing code, but probably a manual check could be needed


In [321]:
sporcessor.df

league_id,league-info_current,league-info_end,league-info_start,league-info_year,coverage_injuries,coverage_odds,coverage_players,coverage_predictions,coverage_standings,coverage_top_assists,coverage_top_cards,coverage_top_scorers,fixture_events,fixture_lineups,fixture_statistics_fixtures,fixture_statistics_players
4,False,2008-06-29,2008-06-07,2008,False,False,True,True,False,True,True,True,True,True,False,False
4,False,2012-07-01,2012-06-08,2012,False,False,True,True,False,True,True,True,True,True,False,False
4,False,2016-07-10,2016-06-10,2016,False,False,True,True,True,True,True,True,True,True,True,True
4,False,2021-07-11,2019-03-21,2020,False,False,True,True,True,True,True,True,True,True,True,True
4,True,2024-07-14,2024-06-14,2024,False,False,True,True,True,True,True,True,True,True,True,True
21,False,2009-06-28,2009-06-14,2009,False,False,True,True,False,True,True,True,True,True,False,False
21,False,2013-06-30,2013-06-15,2013,False,False,True,True,False,True,True,True,True,True,False,False
21,True,2017-07-02,2017-06-17,2017,False,False,True,True,False,True,True,True,True,True,True,False
61,False,2011-05-29,2010-08-07,2010,False,False,True,True,True,True,True,True,True,True,False,False
61,False,2012-05-20,2011-08-06,2011,False,False,True,True,True,True,True,True,True,True,False,False
