# Data Enrichment : Silver Layer to Gold Layer

**Mount the data csv file from the Azure Storage `StorageAccount` Container `Silver_container`**
***

In [0]:
dbutils.fs.mount(
    source='wasbs://silver_container@storage_account.blob.core.windows.net',
    mount_point='/mnt/silver_container',
    extra_configs={'fs.azure.account.key.storage_account.blob.core.windows.net': dbutils.secrets.get('databricksScope', 'MyStorageAccountKey')}
)
dbutils.fs.mount(
    source='wasbs://gold_container@storage_account.blob.core.windows.net',
    mount_point='/mnt/goldgold_container',
    extra_configs={'fs.azure.account.key.storage_account.blob.core.windows.net': dbutils.secrets.get('databricksScope', 'MyStorageAccountKey')}
)


In [0]:
dbutils.fs.ls('/mnt')

[FileInfo(path='dbfs:/mnt/bronze/', name='bronze/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/gold/', name='gold/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/silver/', name='silver/', size=0, modificationTime=0)]

**Spark Initialization**
***

In [0]:
spark

**Data Enrichment**
***
- Reading the clean data from the azure data storage `Silver` Container
- Add new columns
- deleting unecessary columns

In [0]:
csv_file_path = "dbfs:/mnt/silver/silver_cyber_attack_data.csv"
df = spark.read.option("delimiter",",").option("wholeFile",True).option("multiline",True).option("header",True).option("inferSchema",True).csv(csv_file_path)

In [0]:
df.show(10)

+-------------------+-----------------+----------------------+-----------+----------------+--------+-------------+-----------+------------+--------------------+------------------+--------------+---------------+-----------+----------------+------------+--------------+-------------------+--------------------+---------------+--------------------+-----------------+-------------+--------------+----------+
+-------------------+-----------------+----------------------+-----------+----------------+--------+-------------+-----------+------------+--------------------+------------------+--------------+---------------+-----------+----------------+------------+--------------+-------------------+--------------------+---------------+--------------------+-----------------+-------------+--------------+----------+
|2023-05-30 06:33:58|    103.216.15.12|          84.9.164.252|      31225|           17616|    ICMP|          503|       Data|        HTTP|Qui natus odio as...|      IoC Detected|         28.6

In [0]:
import re
from pyspark.sql.functions import udf, date_format, split, col
from pyspark.sql.types import StringType, DataType, TimestampType

In [0]:
patterns = [r'Windows',r'Linux',r'Android',r'iPad',r'iPod',r'iPhone',r'Macintosh',]

def extract_device_or_os(user_agent):
    for pattern in patterns:
        match = re.search(pattern, user_agent, re.I) 
        if match:
            return match.group()
    return 'Unknown' 
extract_device_or_os_udf = udf(extract_device_or_os, StringType())

- Transforming `Device Information` column into `Browser` and `Device/OS`

In [0]:
df = df.withColumn('Browser', split(col('Device Information'),'/')[0])
df = df.withColumn('Device/OS',extract_device_or_os_udf(df['Device Information']))

- Separating `Timestamp` column into `Date` and `Time`

In [0]:
df = df.withColumn('Date', date_format('Timestamp', 'yyyy-MM-dd'))
df = df.withColumn('Time', date_format('Timestamp', 'HH:mm:ss'))

- Droping the unecessary columns

In [0]:
df = df.drop('Device Information', 'Timestamp')

In [0]:
df.printSchema()

root
 |-- Source IP Address: string (nullable = true)
 |-- Destination IP Address: string (nullable = true)
 |-- Source Port: integer (nullable = true)
 |-- Destination Port: integer (nullable = true)
 |-- Protocol: string (nullable = true)
 |-- Packet Length: integer (nullable = true)
 |-- Packet Type: string (nullable = true)
 |-- Traffic Type: string (nullable = true)
 |-- Payload Data: string (nullable = true)
 |-- Malware Indicators: string (nullable = true)
 |-- Anomaly Scores: double (nullable = true)
 |-- Attack Type: string (nullable = true)
 |-- Attack Signature: string (nullable = true)
 |-- Action Taken: string (nullable = true)
 |-- Severity Level: string (nullable = true)
 |-- User Information: string (nullable = true)
 |-- Network Segment: string (nullable = true)
 |-- Geo-location Data: string (nullable = true)
 |-- Proxy Information: string (nullable = true)
 |-- Firewall Logs: string (nullable = true)
 |-- IDS/IPS Alerts: string (nullable = true)
 |-- Log Source: stri

- Storing the aggragted data into the `gold container` in a csv format

In [0]:
gold_container_path = "/mnt/gold/gold_cyber_attack_data.csv"
df.write.mode("overwrite").csv(gold_container_path, header=True)