<h1><center>DATA WRANGLING USING PYSPARK</center></h1>

![Image](https://dimensionless.in/wp-content/uploads/2019/05/pic_data_wrangling.jpg)

# Required Modules

## Install Packages
<p>Restart the kernel after installation</p>

In [1]:
# pip install pyspark

## Import Packages

In [2]:
import tarfile as to_extract
import pyspark.sql.functions as F

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, dayofweek, month, when, dense_rank, to_date, length, hour, to_timestamp, unix_timestamp, expr, avg, min, max
from pyspark.sql.types import IntegerType, DateType, TimestampType
from pyspark.sql.window import Window

import warnings
warnings.filterwarnings('ignore')

# Initiate A Pyspark Session

In [3]:
spark = SparkSession.builder \
    .appName("data-wrangling") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memoryOverhead", "1g") \
    .config("spark.executor.memoryOverhead", "1g") \
    .getOrCreate()

__Comment:__

_The memory configuration was necessary as the size of the data is large_

# Data Directory
- _Change as per the location of your data on your physical machine_
- The original data can be accessed from [here](https://zenodo.org/record/3227177/files/BGL.tar.gz).

In [4]:
data_dir = r'C:\Users\User\Desktop\Data'
zipped_log_data = r'C:\Users\User\Desktop\Data\BGL.tar.gz'
log_file = r'C:\Users\User\Desktop\Data\BGL.log'

# Data Extraction
___Extracting the zipped data ("BGL.tar.gz") with the "tarfile" module to access the "BGL.log" file___

In [5]:
with to_extract.open(zipped_log_data, "r:gz") as zipped:
    zipped.extractall(data_dir)

# Data Definition

## Defining The Raw Data

In [6]:
dat = spark.read.text(log_file)

## Displaying The Raw Data

In [7]:
dat.show(n = 2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                              |
+---------------------------------------------------------------------------------------------------------------------------------------------------+
|- 1117838570 2005.06.03 R02-M1-N0-C:J12-U11 2005-06-03-15.42.50.363779 R02-M1-N0-C:J12-U11 RAS KERNEL INFO instruction cache parity error corrected|
|- 1117838570 2005.06.03 R02-M1-N0-C:J12-U11 2005-06-03-15.42.50.527847 R02-M1-N0-C:J12-U11 RAS KERNEL INFO instruction cache parity error corrected|
+---------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



## Displaying The Schema of The Raw Data

In [8]:
dat.printSchema()

root
 |-- value: string (nullable = true)



##  Raw Data Transformation
- ___Form reorientation___
- ___Type casting___

In [9]:
dat = dat.withColumn("Alert message flag", split(col("value"), " ")[0])
dat = dat.withColumn("Timestamp", split(col("value"), " ")[1])
dat = dat.withColumn("Timestamp", col("Timestamp").cast(IntegerType()))
dat = dat.withColumn("Date", split(col("value"), " ")[2])
dat = dat.withColumn("Date", to_date(col("Date"), "yyyy.MM.dd"))
dat = dat.withColumn("Node", split(col("value"), " ")[3])
dat = dat.withColumn("Date and Time", split(col("value"), " ")[4])
dat = dat.withColumn("Date and Time", to_timestamp(col("Date and Time"), "yyyy-MM-dd-HH.mm.ss.SSSSSS"))
dat = dat.withColumn("Node (repeated)", split(col("value"), " ")[5])
dat = dat.withColumn("Message Type", split(col("value"), " ")[6])
dat = dat.withColumn("System Component", split(col("value"), " ")[7])
dat = dat.withColumn("Level", split(col("value"), " ")[8])
dat = dat.withColumn("Message Content", split(col("value"), " ", 10).getItem(9))
dat = dat.drop("value")

## Displaying The Transformed Raw Data

In [10]:
dat.show(n = 2, truncate=False)

+------------------+----------+----------+-------------------+--------------------------+-------------------+------------+----------------+-----+----------------------------------------+
|Alert message flag|Timestamp |Date      |Node               |Date and Time             |Node (repeated)    |Message Type|System Component|Level|Message Content                         |
+------------------+----------+----------+-------------------+--------------------------+-------------------+------------+----------------+-----+----------------------------------------+
|-                 |1117838570|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:50.363779|R02-M1-N0-C:J12-U11|RAS         |KERNEL          |INFO |instruction cache parity error corrected|
|-                 |1117838570|2005-06-03|R02-M1-N0-C:J12-U11|2005-06-03 15:42:50.527847|R02-M1-N0-C:J12-U11|RAS         |KERNEL          |INFO |instruction cache parity error corrected|
+------------------+----------+----------+-------------------+---

## Displaying The Final Schema

In [11]:
dat.printSchema()

root
 |-- Alert message flag: string (nullable = true)
 |-- Timestamp: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Node: string (nullable = true)
 |-- Date and Time: timestamp (nullable = true)
 |-- Node (repeated): string (nullable = true)
 |-- Message Type: string (nullable = true)
 |-- System Component: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Message Content: string (nullable = true)

