# **Waterfall Pipeline.**

## Environment Setup.
Estas librerías proporcionan la base para realizar el procesamiento y análisis de datos de forma eficiente y visualmente comprensible. Su uso combinado permite realizar tareas desde la manipulación de datos hasta su visualización de manera fluida y efectiva en el entorno de Jupyter Notebook.
1. **Pandas**: Librería de Python especializada en el manejo y análisis de datos tabulares. Permite realizar operaciones de transformación, agregación, limpieza y análisis de datos de manera eficiente.

2. **Numpy**: Librería fundamental para la computación científica en Python. Ofrece soporte para arrays multidimensionales, matrices y funciones matemáticas avanzadas.

3. **IPython.display**: Funciones útiles para mostrar contenido interactivo y enriquecido en los notebooks. Con estas funciones, se puede renderizar HTML, mostrar tablas y gráficos de manera directa en el cuaderno de Jupyter.

In [40]:
import pandas as pd
import numpy as np
from IPython.display import display, HTML
import os

## Pipeline Process: Extract, Transform y Load (ETL)
El pipeline de procesamiento de datos se estructura en tres pasos principales: **Extract**, **Transform** y **Load**. Este flujo se automatiza y gestiona utilizando **Azure Data Factory** (ADF) y se complementa con **Azure Databricks** para realizar el procesamiento y transformación de datos de manera eficiente.

1. La primera fase del pipeline consiste en la **extracción** de datos desde diversas fuentes. Estas fuentes pueden incluir bases de datos, archivos en la nube o sistemas externos.

2. Una vez que los datos han sido extraídos, el siguiente paso es la **transformación**. Este paso incluye la limpieza, agregación y modificación de los datos para que sean adecuados para el análisis.

3. Finalmente, después de que los datos han sido transformados, llega el paso de la **carga**. Este paso implica mover los datos procesados a su destino final, que puede ser una base de datos, un sistema de almacenamiento o un sistema de análisis.

### **01. Extract.**
- **Azure Data Factory** gestiona la conexión a las fuentes de datos.
- Se pueden programar procesos de extracción de manera recurrente, asegurando que siempre se tenga acceso a los datos más recientes.
- La extracción de datos se puede automatizar con **triggers** o eventos programados para iniciar el flujo de datos.

In [41]:
fn = "../data/2023_Threat_Report_Final.xlsx"
sheets = ["Table019 (Page 17-19)", "Table021 (Page 20-24)"]
dfs = [pd.read_excel(f"{fn}", sheet_name=sn, dtype=str) for sn in sheets]
df_waterfall = pd.concat(dfs, ignore_index=True)
df_waterfall.head()

Unnamed: 0,Date,Victim,Region,Industry,Threat\nActor,Sites,Cost,OT / Physical\nConsequences,Incident Summary,References,Column11
0,2010,Stuxnet,Iran,Process Mfg.,Nation State,1.0,,Destroyed 1000 centrifuges at,Plant was infected by the Stuxnet worm in a ta...,en.wikipedia.org/,
1,07-15,,,,,,,Natanz,attack designed to disrupt Iran's nuclear enri...,wiki/Stuxnet,
2,,,,,,,,,program,,
3,2012,Iran's main oil export terminals,Iran,Oil & Gas,Nation State,6.0,,Shutdown 6 terminals,6 terminals ops. affected by Flame malware. News,bbc.com/news/world-,iranprimer.usip.org/blog/
4,04-22,,,,,,,,"outlets confirm outage, despite Iran downplayi...",middle-east-59062907,2021/nov/02/israel-iran-


### **02. Transform.**

#### Calculate groups of rows of the same incident.
En este paso, se crean dos columnas clave en el conjunto de datos. Primero, se genera una columna booleana llamada `Is Year`, que indica si el valor de la columna `Date` corresponde a un año de 4 dígitos. Esto se logra mediante una comparación con un patrón específico para identificar los valores que son años válidos. Luego, se crea una columna `id`, inicialmente llena de valores nulos, que se asigna con el índice de las filas que contienen un año. Posteriormente, se utiliza un método de relleno hacia adelante para propagar los identificadores a las filas correspondientes a incidentes relacionados, asegurando que todas las filas de un mismo grupo compartan el mismo `id`.

In [42]:
df_waterfall['is_year'] = df_waterfall['Date'].str.match("\d{4}").fillna(False)
df_waterfall['id'] = np.nan
df_waterfall.loc[df_waterfall['is_year'], 'id'] = df_waterfall.index.to_series()[df_waterfall['is_year']]
df_waterfall['id'] = df_waterfall['id'].ffill().astype(int)

  df_waterfall['is_year'] = df_waterfall['Date'].str.match("\d{4}").fillna(False)


#### Aggregate texts.
En este paso, se agrupan los datos por el identificador `id` y se consolidan los valores de varias columnas (como `Victim`, `Date`, `Region`, `Industry`, etc.) en una sola cadena, separada por espacios o guiones. Esto se hace para cada grupo de incidentes, creando un resumen compacto de la información relevante para cada uno, lo que facilita su análisis posterior.

In [43]:
df_waterfall_processed = pd.DataFrame(index=df_waterfall['id'].drop_duplicates())
g = df_waterfall.groupby('id')
df_waterfall_processed['victim'] = g['Victim'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['published_date'] = g['Date'].apply(lambda s: '-'.join(s.dropna()))
df_waterfall_processed['region'] = g['Region'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['industry'] = g['Industry'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['threat_actor'] = g['Threat\nActor'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['sites'] = g['Sites'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['cost'] = g['Cost'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['ot_or_physical_consecuences'] = g['OT / Physical\nConsequences'].apply(lambda s: ' '.join(s.dropna()))
df_waterfall_processed['incident_summary'] = g['Incident Summary'].apply(lambda s: ' '.join(s.dropna()))

In [44]:
df_waterfall_processed.head()

Unnamed: 0_level_0,victim,published_date,region,industry,threat_actor,sites,cost,ot_or_physical_consecuences,incident_summary
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,Stuxnet,2010-07-15,Iran,Process Mfg.,Nation State,1,,Destroyed 1000 centrifuges at Natanz,Plant was infected by the Stuxnet worm in a ta...
3,Iran's main oil export terminals,2012-04-22,Iran,Oil & Gas,Nation State,6,,Shutdown 6 terminals,6 terminals ops. affected by Flame malware. Ne...
7,Unknown Power Plant,2012-10-?,USA,Power,Unknown,1,,Delayed turbine restart (thus power generation...,10 plant PCs were infected by Mariposa malware...
12,German steel mill,2014-12-22,Germany,Metals & Mining,Unknown,1,,"Caused ""massive damage"" to plant equipment",Sophisticated attack using spear phishing and ...
15,"Prykarpattyaoblenergo, Chernivtsioblenergo, Ky...",2015-12-13,Ukraine,Power,Nation State,32,,Power outage lasts up to 6 hours affecting 230...,First publicly known attack on a power grid oc...


#### Join references.
En este paso, se combinan las columnas `References` y `Column11` por grupo, eliminando valores nulos y duplicados, y se almacenan como listas en la columna `references` del DataFrame principal.

In [45]:
dfaux = pd.DataFrame()
dfaux[1] = g['References'].apply(lambda s: ''.join(s.dropna()))
dfaux[2] = g['Column11'].apply(lambda s: ''.join(s.dropna()))
df_waterfall_processed['references'] = dfaux.apply(lambda s: s[s!=''].dropna().drop_duplicates().tolist(), axis=1)

#### Assignment of publication year and metadata.
En este paso, se extrae el año de la columna `Published Date` y se asigna a una nueva columna llamada `Published Year`. Además, se restablece el índice del DataFrame y se agregan dos nuevas columnas, `Waterfall Report Year` y `Waterfall Report Name`, con valores constantes que identifican el año y el nombre del informe, respectivamente.

In [46]:
df_waterfall_processed['published_year'] = df_waterfall_processed['published_date'].str.slice(0,4)
df_waterfall_processed = df_waterfall_processed.reset_index()
df_waterfall_processed['waterfall_report_year'] = '2023'
df_waterfall_processed['waterfall_report_name'] = '2023 Threat Report. ICSSTRIVE & Waterfall Security Solutions'

In [47]:
df_waterfall_processed.head()

Unnamed: 0,id,victim,published_date,region,industry,threat_actor,sites,cost,ot_or_physical_consecuences,incident_summary,references,published_year,waterfall_report_year,waterfall_report_name
0,0,Stuxnet,2010-07-15,Iran,Process Mfg.,Nation State,1,,Destroyed 1000 centrifuges at Natanz,Plant was infected by the Stuxnet worm in a ta...,[en.wikipedia.org/wiki/Stuxnet],2010,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
1,3,Iran's main oil export terminals,2012-04-22,Iran,Oil & Gas,Nation State,6,,Shutdown 6 terminals,6 terminals ops. affected by Flame malware. Ne...,"[bbc.com/news/world-middle-east-59062907, iran...",2012,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
2,7,Unknown Power Plant,2012-10-?,USA,Power,Unknown,1,,Delayed turbine restart (thus power generation...,10 plant PCs were infected by Mariposa malware...,[us-cert.gov/sites/default/files/Monitors/ICS-...,2012,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
3,12,German steel mill,2014-12-22,Germany,Metals & Mining,Unknown,1,,"Caused ""massive damage"" to plant equipment",Sophisticated attack using spear phishing and ...,[bbc.com/news/technology-30575104],2014,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
4,15,"Prykarpattyaoblenergo, Chernivtsioblenergo, Ky...",2015-12-13,Ukraine,Power,Nation State,32,,Power outage lasts up to 6 hours affecting 230...,First publicly known attack on a power grid oc...,[en.wikipedia.org/wiki/2015_Ukraine_power_grid...,2015,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...


#### HTML table display.
Se convierte la columna `references` del DataFrame a formato HTML y se muestra como una tabla utilizando el método `display` de IPython.

In [48]:
html_table = df_waterfall_processed[['references']].head(10).to_html()
display(HTML(html_table))

Unnamed: 0,references
0,[en.wikipedia.org/wiki/Stuxnet]
1,"[bbc.com/news/world-middle-east-59062907, iranprimer.usip.org/blog/2021/nov/02/israel-iran-cyber-war-gas-station-attack]"
2,[us-cert.gov/sites/default/files/Monitors/ICS-CERT_Monitor_Oct-Dec2012.pdf]
3,[bbc.com/news/technology-30575104]
4,"[en.wikipedia.org/wiki/2015_Ukraine_power_grid_hack, arstechnica.com/information-technology/2016/01/first-known-hacker-caused-power-outage-signals-troubling-escalation]"
5,"[en.wikipedia.org/wiki/2016_Kyiv_cyberattack, arstechnica.com/information-technology/2017/06/crash-override-malware-may-sabotage-electric-grids-but-its-no-stuxnet]"
6,[industrialcybersecuritypulse.com/facilities/throwback-attack-wannacry-ransomware-takes-renault-nissan-plants-offline]
7,[timesofindia.indiatimes.com/india/indias-largest-container-port-jnpt-hit-by-ransomware/articleshow/59346704.cms]
8,[wired.com/story/notpetya-cyberattack-ukraine-russia-code-crashed-the-world]
9,[theguardian.com/technology/2017/dec/15/triton-hackers-malware-attack-safety-systems-energy-plant]


In [49]:
df_waterfall_processed.head()

Unnamed: 0,id,victim,published_date,region,industry,threat_actor,sites,cost,ot_or_physical_consecuences,incident_summary,references,published_year,waterfall_report_year,waterfall_report_name
0,0,Stuxnet,2010-07-15,Iran,Process Mfg.,Nation State,1,,Destroyed 1000 centrifuges at Natanz,Plant was infected by the Stuxnet worm in a ta...,[en.wikipedia.org/wiki/Stuxnet],2010,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
1,3,Iran's main oil export terminals,2012-04-22,Iran,Oil & Gas,Nation State,6,,Shutdown 6 terminals,6 terminals ops. affected by Flame malware. Ne...,"[bbc.com/news/world-middle-east-59062907, iran...",2012,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
2,7,Unknown Power Plant,2012-10-?,USA,Power,Unknown,1,,Delayed turbine restart (thus power generation...,10 plant PCs were infected by Mariposa malware...,[us-cert.gov/sites/default/files/Monitors/ICS-...,2012,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
3,12,German steel mill,2014-12-22,Germany,Metals & Mining,Unknown,1,,"Caused ""massive damage"" to plant equipment",Sophisticated attack using spear phishing and ...,[bbc.com/news/technology-30575104],2014,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...
4,15,"Prykarpattyaoblenergo, Chernivtsioblenergo, Ky...",2015-12-13,Ukraine,Power,Nation State,32,,Power outage lasts up to 6 hours affecting 230...,First publicly known attack on a power grid oc...,[en.wikipedia.org/wiki/2015_Ukraine_power_grid...,2015,2023,2023 Threat Report. ICSSTRIVE & Waterfall Secu...


### **03. Load.**

Se verifica si el código se ejecuta en Azure Databricks comprobando la variable de entorno `DATABRICKS_RUNTIME_VERSION`. Si es así, se convierte el DataFrame `df_waterfall_processed` a un DataFrame de Spark, se crea un metadato con información del informe y se guarda en una tabla de Hive en formato Delta, sobrescribiendo cualquier dato previo.

In [50]:
# Verificar si estamos en un entorno de Azure Databricks
if "DATABRICKS_RUNTIME_VERSION" in os.environ:
    print("Ejecutando en Databricks...")
    df_sp = spark.createDataFrame(df_waterfall_processed)
    userMetadata = f"Waterfall Report '2023 Threat Report. ICSSTRIVE & Waterfall Security Solutions'. Link to documentation: https://denexus.atlassian.net/wiki/spaces/MOD/pages/440107013/Cyber+Incidents+-+version+2023. Link to PDF in Sharepoint: https://netorgft5207081.sharepoint.com/:b:/s/External_ModelDev/EQoLzSXz2KdCr2bZki7twDoB-ml8QQHufOMzGEL27e0smw?e=Qzx62b."
    print(userMetadata)
    df_sp.coalesce(1).write.format("delta").mode("overwrite").option("overwriteSchema","true").option("userMetadata", userMetadata).saveAsTable("hive_metastore.temporal_tables.dkc_waterfall_report_incidents")