In [0]:
from pyspark.sql.functions import col, explode, when
from pyspark.sql import functions as F
import spacy
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType
import pandas as pd
from pyspark.sql.functions import year
from pyspark.sql.functions import col, monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col




In [0]:
!python -m spacy download en_core_web_sm

Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/12.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/12.8 MB[0m [31m7.2 MB/s[0m eta [36m0:00:02[0m[2K     [91m━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━[0m [32m6.1/12.8 MB[0m [31m89.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m12.8/12.8 MB[0m [31m208.3 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m12.8/12.8 MB[0m [31m208.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m88.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: en-core-web-sm
Successfully 

#Lectura de nuestros datos almacenados en el storage account

In [0]:
abfs_path = f"abfss://articles@inetumau.dfs.core.windows.net/archivo.json"
df = spark.read.format("json").option("inferSchema", "true").load(abfs_path)


#Extraccion del campo author de la data leida

In [0]:
df = df.withColumn("author", explode(col("authors")))
df = df.withColumn("author_name", col("author")["name"])


#Instancia de  clase spacy para el proceso NLP

In [0]:

nlp = spacy.load("en_core_web_sm")

In [0]:
categories = {
    "science": ["NASA", "astronomy", "telescope", "research", "space exploration", "planets", "galaxy", "physicists", "theory", "study", "astrophysicist", "space laboratory"],
    "missions": ["launch", "rocket", "mission", "crew", "space station", "rovers", "moon", "mars", "jupiter", "satellites", "orbiter", "lunar exploration", "space travel"],
    "technology": ["technology", "innovation", "reusable rockets", "artificial intelligence", "automation", "sensors", "navigation", "space systems", "advanced materials"],
    "politics": ["government", "regulation", "space policy", "funding", "legislation", "congress", "alliances", "international agreements", "space competition", "space industry", "corporations"],
    "commercial": ["company", "SpaceX", "Blue Origin", "private", "commercial launches", "private industry", "private rocket", "investors", "business development", "space market"],
    "security": ["defense", "military satellites", "national security", "cybersecurity", "military technology", "space warfare", "orbital surveillance", "space systems", "weapon systems"],
    "astronomy": ["star", "galaxy", "telescope", "observation", "exoplanets", "stellar clusters", "black holes", "nebula", "radio telescope", "spectroscopy", "search for life"],
    "climate and environment": ["climate change", "space weather", "weather satellites", "earth observation", "thermal waters", "global temperature", "solar cycle", "space climate", "solar energy"],
    "mars exploration": ["Mars", "rovers", "landing", "explorer", "martian surface", "water on Mars", "Martian atmosphere", "mission to Mars", "Curiosity", "Perseverance"],
    "astronautics": ["astronaut", "International Space Station", "spacelab", "crewed mission", "life support system", "space module", "microgravity experiments"],
    "space future": ["colonization", "terraforming", "space life", "lunar station", "nuclear rockets", "interplanetary exploration", "space transportation", "advanced propulsion systems"]
}

#Definicion de nuestras funciones que van a permitir extraer palabras claves, identificacion entidades y clasificacion de articulos

In [0]:
def extract_entities(text, entity_type):
    if not text:
        return ""
    doc = nlp(text)
    return ", ".join(set(ent.text for ent in doc.ents if ent.label_ == entity_type))

In [0]:
def extract_keywords(text):
    if not text:
        return ""
    doc = nlp(text)
    return ", ".join(set(token.text for token in doc if token.pos_ in ['NOUN', 'ADJ']))

In [0]:
def classif_text(text):
    if not text:
        return ""
    doc = nlp(text)
    category_scores = {category: 0 for category in categories}
    
    for token in doc:
        token_text = token.text.strip().lower()
        for category, keywords in categories.items():
            for keyword in keywords:
                if keyword.lower() in token_text:
                    category_scores[category] += 1
    category = max(category_scores, key=category_scores.get)
    
    return category


In [0]:
@pandas_udf(StringType())
def extract_org_udf(texts: pd.Series) -> pd.Series:
    return texts.apply(lambda x: extract_entities(x, "ORG"))

@pandas_udf(StringType())
def extract_person_udf(texts: pd.Series) -> pd.Series:
    return texts.apply(lambda x: extract_entities(x, "PERSON"))

@pandas_udf(StringType())
def extract_place_udf(texts: pd.Series) -> pd.Series:
    return texts.apply(lambda x: extract_entities(x, "LOC"))

@pandas_udf(StringType())
def extract_keywords_udf(texts: pd.Series) -> pd.Series:
    return texts.apply(lambda x: extract_keywords(x))

@pandas_udf(StringType())
def classify(texts: pd.Series) -> pd.Series:
    return texts.apply(lambda x: classif_text(x))

In [0]:
df = df.withColumn("entity_company", extract_org_udf(col("summary")))
df = df.withColumn("entity_people", extract_person_udf(col("summary")))
df = df.withColumn("entity_place", extract_place_udf(col("summary")))
df = df.withColumn("key_words", extract_keywords_udf(col("summary")))
df = df.withColumn("category", classify(col("summary")))

#Filtrado y limpieza de nuestro Dataframe final 

In [0]:
df_final = df.select("id",
                     "summary",
                     "news_site",
                     "published_at",
                     "updated_at",
                     "title",
                     "type",
                     "author_name",                     
                     when(col("entity_company").isNull() | (col("entity_company") == ""), "None").otherwise(col("entity_company")).alias("entity_company"),                     
                     when(col("entity_people").isNull() | (col("entity_people") == ""), "None").otherwise(col("entity_people")).alias("entity_people"),                     
                     when(col("entity_place").isNull() | (col("entity_place") == ""), "None").otherwise(col("entity_place")).alias("entity_place"),                     
                     when(col("key_words").isNull() | (col("key_words") == ""), "None").otherwise(col("key_words")).alias("key_words"),
                     "category")

In [0]:
display(df_final.limit(10))

id,summary,news_site,published_at,updated_at,title,type,author_name,entity_company,entity_people,entity_place,key_words,category
28946,Viasat won its first task order under the Proliferated Low Earth Orbit (PLEO) satellite services contract The post U.S. Space Force awards Viasat $3.5 million satellite services contract appeared first on SpaceNews.,SpaceNews,2025-02-03T13:01:00Z,2025-02-03T13:10:19.266532Z,U.S. Space Force awards Viasat $3.5 million satellite services contract,article,Sandra Erwin,the Proliferated Low Earth Orbit,Viasat,SpaceNews,"awards, post, first, order, services, satellite, task, contract",science
28945,"NASA has directed a set of science committees to pause their work, citing recent Trump administration executive orders, a move that canceled one meeting and put others on hold. The post NASA pauses work of science groups, citing Trump executive orders appeared first on SpaceNews.",SpaceNews,2025-02-03T12:41:06Z,2025-02-03T12:50:17.830325Z,"NASA pauses work of science groups, citing Trump executive orders",article,Jeff Foust,"Trump, NASA",,SpaceNews,"groups, work, move, post, science, meeting, recent, committees, executive, others, orders, administration, hold, set",science
28944,"In a rapidly evolving space industry defined by mounting competition and compressed timelines, CubeSpace (CS) is transforming satellite control for SmallSats. A global leader in satellite Attitude Determination and Control […] The post CubeSpace: Revolutionizing Satellite Control for the SmallSat Market through Latest Investment appeared first on SpaceNews.",SpaceNews,2025-02-03T12:00:00Z,2025-02-03T12:10:12.148769Z,CubeSpace: Revolutionizing Satellite Control for the SmallSat Market through Latest Investment,article,CubeSpace Satellite Systems,"CubeSpace, Revolutionizing Satellite Control, SmallSats, Attitude Determination, CS, Latest Investment",,SpaceNews,"industry, leader, competition, post, timelines, satellite, global, control, SmallSats, space",science
28943,"For astronauts aboard the International Space Station, staying connected to loved ones and maintaining a sense of normalcy is critical. That is where Tandra Gill Spain, a computer resources senior project manager in NASA’s Avionics and Software Office, comes in. Spain leads the integration of applications on Apple devices and the hardware integration on the […]",NASA,2025-02-03T12:00:00Z,2025-02-03T12:00:15.395412Z,"Station Nation: Meet Tandra Gill Spain, Computer Resources Senior Project Manager in the Avionics and Software Office",article,NASA,"Avionics and Software Office, Apple, the International Space Station, NASA",Tandra Gill,,"normalcy, critical, devices, computer, sense, applications, hardware, project, ones, senior, manager, integration, astronauts",science
28942,"‘Here there be dragons’ – medieval mapmakers would daub such warnings on dangerous or unknown recesses of mariners’ charts. Fast forward to the 21st century and the space surrounding our planet holds ‘dragons’ of its own: fierce radiation belts that surround Earth, Jupiter and other planets, and storms of particles originating from the Sun that travel far into space. A dedicated class of engineers work to safeguard space missions from such dangers.",ESA,2025-02-03T08:43:00Z,2025-02-03T08:50:13.667259Z,Here there be radiation dragons,article,ESA,Sun,,"Earth, Jupiter","mariners, dangers, medieval, mapmakers, dangerous, fierce, particles, belts, missions, other, warnings, own, recesses, such, charts, century, unknown, class, dedicated, 21st, dragons, radiation, planet, planets, storms, space, engineers",missions
28947,"A list of the top 10 global regions where natural or anthropogenic sources emit methane on a continuous, ‘persistent’ basis was recently published in a scientific journal.",ESA,2025-02-03T07:44:00Z,2025-02-03T13:20:16.774479Z,Top 10 persistent methane sources,article,ESA,,,,"journal, anthropogenic, sources, natural, top, list, global, regions, methane, persistent, basis, scientific, continuous",science
28941,"HELSINKI — Japan’s flagship H3 rocket successfully launched the Michibiki 6 navigation satellite early Sunday, enhancing the country’s regional GPS capabilities. The Mitsubishi Heavy Industries (MHI) H3 rocket lifted off […] The post Japan launches Michibiki 6 navigation satellite with fifth H3 rocket appeared first on SpaceNews.",SpaceNews,2025-02-02T22:13:09Z,2025-02-03T10:22:31.880706Z,Japan launches Michibiki 6 navigation satellite with fifth H3 rocket,article,Andrew Jones,"The Mitsubishi Heavy Industries, MHI",,SpaceNews,"flagship, country, post, satellite, rocket, early, regional, fifth, capabilities, navigation",missions
28940,"Here is SpacePolicyOnline.com’s list of space policy events for the week of February 2-8, 2025 and any insight we can offer about them. The House and Senate are in session […]",SpacePolicyOnline.com,2025-02-02T17:19:24Z,2025-02-02T17:20:11.093290Z,"What’s Happening in Space Policy February 2-8, 2025",article,Marcia Smith,"House, Senate",,,"events, policy, list, week, session, insight, space",science
28939,A recently launched Indian navigation satellite is stranded in a transfer orbit after the failure of its onboard propulsion system and could soon renter. The post Indian navigation satellite stuck in transfer orbit after propulsion failure appeared first on SpaceNews.,SpaceNews,2025-02-02T15:55:11Z,2025-02-03T10:22:48.522733Z,Indian navigation satellite stuck in transfer orbit after propulsion failure,article,Jeff Foust,,,SpaceNews,"orbit, propulsion, onboard, post, satellite, system, Indian, failure, transfer, navigation",technology
28938,Space company executives project a new wave of investment and consolidation that could be a “more thoughtful” version of the previous surge five years ago. The post Industry executives predict “more thoughtful” new wave of space deals appeared first on SpaceNews.,SpaceNews,2025-02-02T01:00:31Z,2025-02-02T01:10:10.259430Z,Industry executives predict “more thoughtful” new wave of space deals,article,Jeff Foust,,,SpaceNews,"wave, new, years, post, thoughtful, previous, deals, consolidation, version, executives, surge, space, company, investment",commercial


# Tendencias de Temas por tiempo

In [0]:
df_time = df_final.withColumn("published_at", F.to_timestamp("published_at"))

In [0]:
df_tend = df_time.groupBy(F.month("published_at").alias("month"), "category").count()
df_tend = df_tend.orderBy("month", F.desc("count"))

display(df_tend)

month,category,count
1,science,112
1,missions,67
1,commercial,5
1,technology,5
1,astronomy,4
1,security,4
1,politics,3
1,astronautics,1
2,science,12
2,missions,7


#Analisis de fuentes mas activas

In [0]:
#Analisis de fuentes mas activas
df_sources = df_final.groupBy("news_site").count()
df_sources=df_sources.orderBy(F.desc("count"))
display(df_sources)

news_site,count
SpaceNews,85
NASA,60
ESA,30
European Spaceflight,11
Arstechnica,9
Spaceflight Now,8
NASASpaceflight,8
SpacePolicyOnline.com,5
The Launch Pad,5
Space Scout,1


#Particion y almacenamiento de datos historicos en formato parquet en el data lake

In [0]:
#Particion de datos historicos en formato parquet
abfs_path = "abfss://articles@inetumau.dfs.core.windows.net/historical_data/"
df_time = df_time.withColumn("year", year("published_at"))
df_time.write.mode("overwrite").partitionBy("year").parquet(abfs_path)

#Cacheo de datos

In [0]:
#Cacheo de datos
df_final.cache()

DataFrame[id: bigint, summary: string, news_site: string, published_at: string, updated_at: string, title: string, type: string, author_name: string, entity_company: string, entity_people: string, entity_place: string, key_words: string, category: string]

#Creacion de las dimensiones y tabla de hechos en Dtabricks SQL

In [0]:
"""%sql
create table autor.autors.autor(
  id_autor int PRIMARY KEY,
  author_name string  
)
PARTITIONED BY (id_autor);

create table autor.autors.site(
  id_site int PRIMARY KEY,
  news_site string  
)
PARTITIONED BY (id_site);

create table autor.autors.category(
  id_category int PRIMARY KEY,
  category string  
)
PARTITIONED BY (id_category);"""

'%sql\ncreate table autor.autors.autor(\n  id_autor int PRIMARY KEY,\n  author_name string  \n);\n\ncreate table autor.autors.site(\n  id_site int PRIMARY KEY,\n  news_site string  \n);\n\ncreate table autor.autors.category(\n  id_category int PRIMARY KEY,\n  category string  \n)'

In [0]:
"""%sql
create table autor.autors.hechos(
  id_autor int foreign key references autor.autors.autor(id_autor),
  id_site int foreign key references autor.autors.site(id_site),
  id_category int foreign key references autor.autors.category(id_category),
  cantidad_noticias_autor int,
  cantidad_noticias_categoria int,
  cantidad_noticias_site int)
  PARTITIONED BY (id_autor,id_site,id_category);"""

#Poblado de la dimension autor

In [0]:
df_authors = df_final.select("author_name").distinct()
window_spec = Window.orderBy("author_name")
df_authors = df_authors.withColumn("id_autor", row_number().over(window_spec))





In [0]:
df_authors.write.mode("overwrite").saveAsTable("autor.autors.autor")

#Poblado de la dimension category

In [0]:
df_categories = df_final.select("category").distinct()
window_spec = Window.orderBy("category")
df_categories = df_categories.withColumn("id_category", row_number().over(window_spec))


In [0]:
display(df_categories)

category,id_category
astronautics,1
astronomy,2
commercial,3
missions,4
politics,5
science,6
security,7
technology,8


In [0]:
df_categories.write.mode("overwrite").saveAsTable("autor.autors.category")

#Poblado de la dimension site

In [0]:
df_site = df_final.select("news_site").distinct()
window_spec = Window.orderBy("news_site")
df_site = df_site.withColumn("id_site", row_number().over(window_spec))

In [0]:
df_site.write.mode("overwrite").saveAsTable("autor.autors.site")

In [0]:
%sql
select * from autor.autors.autor;


id_site,news_site
1,Arstechnica
2,ESA
3,European Spaceflight
4,NASA
5,NASASpaceflight
6,Space Scout
7,SpaceNews
8,SpacePolicyOnline.com
9,Spaceflight Now
10,The Launch Pad


In [0]:
%sql
select * from autor.autors.category;


In [0]:
%sql
select * from autor.autors.site

#Poblado de la dimension Hechos.

In [0]:

%python
df_final.createOrReplaceTempView("df_final")



In [0]:
"""%sql
INSERT INTO autor.autors.hechos(id_autor,id_site,id_category,cantidad_noticias_autor,cantidad_noticias_categoria,cantidad_noticias_site)
SELECT 
    a.id_autor,
    s.id_site,
    c.id_category,
    COUNT(df.author_name) AS cantidad_noticias_autor,
    COUNT(*) OVER(PARTITION BY c.id_category) AS cantidad_noticias_categoria,
    COUNT(*) OVER(PARTITION BY s.id_site) AS cantidad_noticias_site
FROM df_final df
JOIN autor.autors.autor a ON (df.author_name = a.author_name)
JOIN autor.autors.site s ON (df.news_site = s.news_site)
JOIN autor.autors.category c ON (df.category = c.category)
GROUP BY a.id_autor, s.id_site, c.id_category;"""

num_affected_rows,num_inserted_rows
58,58


In [0]:
%sql
select * from autor.autors.hechos;

id_autor,id_site,id_category,cantidad_noticias_autor,cantidad_noticias_categoria,cantidad_noticias_site
9,2,1,1,1,5
25,4,2,1,4,3
29,7,2,1,4,34
19,7,2,1,4,34
7,7,2,1,4,34
9,2,3,1,5,5
27,5,3,1,5,7
18,7,3,1,5,34
28,7,3,1,5,34
19,7,3,2,5,34


#Analisis SQL

##Tendencia de temas por mes

In [0]:
%sql
SELECT 
    DATE_TRUNC('month', df.published_at) AS month,
    c.category,
    COUNT(*) AS tendencia_count
FROM df_final df
JOIN autor.autors.category c ON (df.category = c.category)
GROUP BY month, c.category
ORDER BY month, tendencia_count DESC;


month,category,tendencia_count
2025-01-01T00:00:00Z,science,112
2025-01-01T00:00:00Z,missions,67
2025-01-01T00:00:00Z,commercial,5
2025-01-01T00:00:00Z,technology,5
2025-01-01T00:00:00Z,astronomy,4
2025-01-01T00:00:00Z,security,4
2025-01-01T00:00:00Z,politics,3
2025-01-01T00:00:00Z,astronautics,1
2025-02-01T00:00:00Z,science,12
2025-02-01T00:00:00Z,missions,7


##Fuentes mas influyentes

In [0]:
%sql
SELECT 
    s.news_site,
    COUNT(*) AS count_sources
FROM df_final df
JOIN autor.autors.site s ON (df.news_site = s.news_site)
GROUP BY s.news_site
ORDER BY count_sources DESC;


news_site,cantidad_noticias
SpaceNews,85
NASA,60
ESA,30
European Spaceflight,11
Arstechnica,9
Spaceflight Now,8
NASASpaceflight,8
SpacePolicyOnline.com,5
The Launch Pad,5
Space Scout,1
