# Dataframe Manipulation

In this project, we will be learning about how we can instantiate a spark project, which we will use later in creating a End-to-end data pipeline. 

## Loading .CSV and .Json files


### Instantiate SparkSession

1. **Import PySpark**: Use `pyspark.sql` to leverage Spark's data processing capabilities.
2. **Create a Spark Session**: `SparkSession.builder.getOrCreate()` initializes the driver node, which manages tasks across the cluster.
3. **Purpose**: The `spark` object is the entry point for interacting with Spark for data analysis and processing.

In [2]:
# import pyspark library
from pyspark.sql import SparkSession

# create driver node/
spark = SparkSession.builder.getOrCreate()
spark

24/11/19 16:48:11 WARN Utils: Your hostname, Doomzies-2.local resolves to a loopback address: 127.0.0.1; using 192.168.68.101 instead (on interface en0)
24/11/19 16:48:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 16:48:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Loading and Displaying a CSV File in Spark

1. **File Path**: Specify the file path (`location_temp_path`) for the CSV file to be loaded.
2. **Reading Data**: 
   - Use `spark.read.format("csv")` to load the CSV file.
   - The `option("header", "true")` ensures the first row is treated as column headers.


In [3]:
# load location_temp.csv to df
location_temp_path = './data/location_temp.csv'
df_1 = spark.read.format("csv").option(
    "header", "true").load(location_temp_path)  # specify the format to be either 'csv' or 'json'

df_1.show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [4]:
# load utilization.csv into df
utilization_path = "./data//utilization.csv"
df_2 = spark.read.format("csv").option("header", "false").option(
    "inferSchema", "true").load(utilization_path)

df_2.show()

                                                                                

+-------------------+---+----+----+---+
|                _c0|_c1| _c2| _c3|_c4|
+-------------------+---+----+----+---+
|03/05/2019 08:06:14|100|0.57|0.51| 47|
|03/05/2019 08:11:14|100|0.47|0.62| 43|
|03/05/2019 08:16:14|100|0.56|0.57| 62|
|03/05/2019 08:21:14|100|0.57|0.56| 50|
|03/05/2019 08:26:14|100|0.35|0.46| 43|
|03/05/2019 08:31:14|100|0.41|0.58| 48|
|03/05/2019 08:36:14|100|0.57|0.35| 58|
|03/05/2019 08:41:14|100|0.41| 0.4| 58|
|03/05/2019 08:46:14|100|0.53|0.35| 62|
|03/05/2019 08:51:14|100|0.51| 0.6| 45|
|03/05/2019 08:56:14|100|0.32|0.37| 47|
|03/05/2019 09:01:14|100|0.62|0.59| 60|
|03/05/2019 09:06:14|100|0.66|0.72| 57|
|03/05/2019 09:11:14|100|0.54|0.54| 44|
|03/05/2019 09:16:14|100|0.29| 0.4| 47|
|03/05/2019 09:21:14|100|0.43|0.68| 66|
|03/05/2019 09:26:14|100|0.49|0.66| 65|
|03/05/2019 09:31:14|100|0.64|0.55| 66|
|03/05/2019 09:36:14|100|0.42| 0.6| 42|
|03/05/2019 09:41:14|100|0.55|0.59| 63|
+-------------------+---+----+----+---+
only showing top 20 rows



Here, we will be comparing the count and the columns found in both data frames. 

In [5]:
# show the first 10 entries in csv file
df_1.head(10)

[Row(event_date='03/04/2019 19:48:06', location_id='loc0', temp_celcius='29'),
 Row(event_date='03/04/2019 19:53:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 19:58:06', location_id='loc0', temp_celcius='28'),
 Row(event_date='03/04/2019 20:03:06', location_id='loc0', temp_celcius='30'),
 Row(event_date='03/04/2019 20:08:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 20:13:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 20:18:06', location_id='loc0', temp_celcius='27'),
 Row(event_date='03/04/2019 20:23:06', location_id='loc0', temp_celcius='29'),
 Row(event_date='03/04/2019 20:28:06', location_id='loc0', temp_celcius='32'),
 Row(event_date='03/04/2019 20:33:06', location_id='loc0', temp_celcius='35')]

In [6]:
# determine the number of entries in csv file
df_1.count()

500000

In [7]:
df_2.head(10)

[Row(_c0='03/05/2019 08:06:14', _c1=100, _c2=0.57, _c3=0.51, _c4=47),
 Row(_c0='03/05/2019 08:11:14', _c1=100, _c2=0.47, _c3=0.62, _c4=43),
 Row(_c0='03/05/2019 08:16:14', _c1=100, _c2=0.56, _c3=0.57, _c4=62),
 Row(_c0='03/05/2019 08:21:14', _c1=100, _c2=0.57, _c3=0.56, _c4=50),
 Row(_c0='03/05/2019 08:26:14', _c1=100, _c2=0.35, _c3=0.46, _c4=43),
 Row(_c0='03/05/2019 08:31:14', _c1=100, _c2=0.41, _c3=0.58, _c4=48),
 Row(_c0='03/05/2019 08:36:14', _c1=100, _c2=0.57, _c3=0.35, _c4=58),
 Row(_c0='03/05/2019 08:41:14', _c1=100, _c2=0.41, _c3=0.4, _c4=58),
 Row(_c0='03/05/2019 08:46:14', _c1=100, _c2=0.53, _c3=0.35, _c4=62),
 Row(_c0='03/05/2019 08:51:14', _c1=100, _c2=0.51, _c3=0.6, _c4=45)]

In [8]:
df_2.count()

24/11/19 16:48:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


500000

We will be renaming the columns in `df_2` to be as such: 

In [9]:
df_2 = df_2.withColumnRenamed("_c0", "event_datetime") \
    .withColumnRenamed("_c1", "server_id")       \
    .withColumnRenamed("_c2", "cpu_utilization")  \
    .withColumnRenamed("_c3", "free_memory")      \
    .withColumnRenamed("_c4", "session_count")

In [10]:
df_2.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:14|      100|           0.57|       0.51|           47|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:16:14|      100|           0.56|       0.57|           62|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 08:26:14|      100|           0.35|       0.46|           43|
|03/05/2019 08:31:14|      100|           0.41|       0.58|           48|
|03/05/2019 08:36:14|      100|           0.57|       0.35|           58|
|03/05/2019 08:41:14|      100|           0.41|        0.4|           58|
|03/05/2019 08:46:14|      100|           0.53|       0.35|           62|
|03/05/2019 08:51:14|      100|           0.51|        0.6|           45|
|03/05/2019 08:56:14|      100|       

Here, we will be saving our files as `.json` files. 

In [11]:
json_df1_path = "./data/location_temp.json"
df_1.write.json(json_df1_path)

json_df2_path = "./data/utilization.json"
df_2.write.json(json_df2_path)

                                                                                

We will be doing the same, except now we will be using `.json` files instead. 

In [12]:
df_1_json = spark.read.format('json').load(json_df1_path)
df_1_json.show()



+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

                                                                                

## Basic DataFrame Operations



In [13]:
# print out columns in dataframe
df_2.columns

['event_datetime',
 'server_id',
 'cpu_utilization',
 'free_memory',
 'session_count']

In [16]:
# Sampling from the dataframe
# the boolean specifies if we want to replace the entry that has been sampled in subsequent sampling
# fraction specifies the percentage of entries we want
df_2_sample = df_2.sample(False, fraction=0.1)

df_2_sample.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:21:14|      100|           0.57|       0.56|           50|
|03/05/2019 09:06:14|      100|           0.66|       0.72|           57|
|03/05/2019 10:36:14|      100|           0.62|       0.36|           50|
|03/05/2019 15:56:14|      100|           0.63|       0.42|           61|
|03/05/2019 17:26:14|      100|           0.34|       0.61|           52|
|03/05/2019 18:26:14|      100|           0.44|       0.52|           72|
|03/05/2019 18:31:14|      100|           0.28|       0.72|           57|
|03/05/2019 18:56:14|      100|           0.52|       0.66|           47|
|03/05/2019 19:01:14|      100|           0.61|       0.45|           38|
|03/05/2019 20:21:14|      100|       

In [17]:
# sort by a feature
df_2_sorted = df_2_sample.sort('event_datetime')

df_2_sorted.show()

+-------------------+---------+---------------+-----------+-------------+
|     event_datetime|server_id|cpu_utilization|free_memory|session_count|
+-------------------+---------+---------------+-----------+-------------+
|03/05/2019 08:06:21|      104|           0.84|       0.36|           94|
|03/05/2019 08:06:26|      107|           0.64|       0.55|           56|
|03/05/2019 08:06:28|      108|           0.62|       0.41|           86|
|03/05/2019 08:06:46|      119|           0.51|       0.49|           53|
|03/05/2019 08:06:56|      125|           0.42|       0.37|           58|
|03/05/2019 08:07:06|      130|           0.52|       0.49|           77|
|03/05/2019 08:07:15|      135|           0.55|        0.5|           70|
|03/05/2019 08:11:14|      100|           0.47|       0.62|           43|
|03/05/2019 08:11:16|      101|           0.71|       0.18|           98|
|03/05/2019 08:11:17|      102|           0.59|       0.13|           70|
|03/05/2019 08:11:26|      107|       

                                                                                

In [18]:
# Filtering data in dataframe
df_1.filter(df_1["location_id"] == "loc0").show()

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
|03/04/2019 20:38:06|       loc0|          32|
|03/04/2019 20:43:06|       loc0|          28|
|03/04/2019 20:48:06|       loc0|          28|
|03/04/2019 20:53:06|       loc0|          32|
|03/04/2019 20:58:06|       loc0|          34|
|03/04/2019 21:03:06|       loc0|          33|
|03/04/2019 21:08:06|       loc0|          27|
|03/04/2019 21:13:06|       loc0|          28|
|03/04/2019 2

In [20]:
# Aggregating data

# group by category, and attaining count 
df_1.groupBy("location_id").count().show()

[Stage 21:>                                                         (0 + 4) / 4]

+-----------+-----+
|location_id|count|
+-----------+-----+
|      loc22| 1000|
|      loc31| 1000|
|      loc82| 1000|
|      loc90| 1000|
|     loc118| 1000|
|      loc39| 1000|
|      loc75| 1000|
|     loc122| 1000|
|      loc24| 1000|
|      loc30| 1000|
|     loc105| 1000|
|      loc96| 1000|
|     loc102| 1000|
|      loc18| 1000|
|      loc27| 1000|
|     loc143| 1000|
|      loc43| 1000|
|     loc123| 1000|
|      loc15| 1000|
|      loc48| 1000|
+-----------+-----+
only showing top 20 rows



                                                                                

In [21]:
# ordering
df_1.orderBy("location_id").show(10)

+-------------------+-----------+------------+
|         event_date|location_id|temp_celcius|
+-------------------+-----------+------------+
|03/04/2019 19:48:06|       loc0|          29|
|03/04/2019 19:53:06|       loc0|          27|
|03/04/2019 19:58:06|       loc0|          28|
|03/04/2019 20:03:06|       loc0|          30|
|03/04/2019 20:08:06|       loc0|          27|
|03/04/2019 20:13:06|       loc0|          27|
|03/04/2019 20:18:06|       loc0|          27|
|03/04/2019 20:23:06|       loc0|          29|
|03/04/2019 20:28:06|       loc0|          32|
|03/04/2019 20:33:06|       loc0|          35|
+-------------------+-----------+------------+
only showing top 10 rows



In [22]:
# finding average
df_1.groupBy("location_id").agg({'temp_celcius': 'mean'}).show()

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|      loc22|           28.251|
|      loc31|           25.196|
|      loc82|           27.355|
|      loc90|           23.216|
|     loc118|           24.219|
|      loc39|           25.199|
|      loc75|           23.209|
|     loc122|            32.36|
|      loc24|           31.109|
|      loc30|           30.211|
|     loc105|           26.217|
|      loc96|           28.138|
|     loc102|           30.327|
|      loc18|           30.198|
|      loc27|           31.239|
|     loc143|           28.213|
|      loc43|           32.196|
|     loc123|           23.424|
|      loc15|           32.171|
|      loc48|           30.244|
+-----------+-----------------+
only showing top 20 rows



                                                                                

In [25]:
# find maximum
df_1.groupby('location_id').agg({'temp_celcius': 'max'}).show()

+-----------+-----------------+
|location_id|max(temp_celcius)|
+-----------+-----------------+
|       loc0|               36|
|       loc1|               35|
|      loc10|               32|
|     loc100|               34|
|     loc101|               32|
|     loc102|               37|
|     loc103|               32|
|     loc104|               33|
|     loc105|               33|
|     loc106|               34|
|     loc107|               40|
|     loc108|               39|
|     loc109|               31|
|      loc11|               32|
|     loc110|               33|
|     loc111|               38|
|     loc112|               40|
|     loc113|               37|
|     loc114|               36|
|     loc115|               30|
+-----------+-----------------+
only showing top 20 rows



In [28]:
df_s1 = df_1.sample(fraction=0.1, withReplacement=False)
df_s1.count()

50040

In [30]:
df_s1.groupBy("location_id").agg({'temp_celcius': 'mean'}).show(10)

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|      loc22| 27.96590909090909|
|      loc31|25.207207207207208|
|      loc82|27.233644859813083|
|      loc90| 23.16504854368932|
|     loc118| 24.30275229357798|
|      loc39| 25.27777777777778|
|      loc75|23.341463414634145|
|     loc122| 32.31683168316832|
|      loc24|             31.25|
|      loc30|30.214285714285715|
+-----------+------------------+
only showing top 10 rows



In [31]:
df_s1.groupBy("location_id").agg(
    {'temp_celcius': 'mean'}).orderBy("location_id").show(10)

+-----------+------------------+
|location_id| avg(temp_celcius)|
+-----------+------------------+
|       loc0| 29.59259259259259|
|       loc1|28.425742574257427|
|      loc10|25.383838383838384|
|     loc100|27.540816326530614|
|     loc101|25.515151515151516|
|     loc102|30.145299145299145|
|     loc103|25.326923076923077|
|     loc104| 25.91860465116279|
|     loc105| 26.23469387755102|
|     loc106|26.813725490196077|
+-----------+------------------+
only showing top 10 rows



In [32]:
df_1.groupBy("location_id").agg(
    {'temp_celcius': 'mean'}).orderBy("location_id").show(10)

+-----------+-----------------+
|location_id|avg(temp_celcius)|
+-----------+-----------------+
|       loc0|           29.176|
|       loc1|           28.246|
|      loc10|           25.337|
|     loc100|           27.297|
|     loc101|           25.317|
|     loc102|           30.327|
|     loc103|           25.341|
|     loc104|           26.204|
|     loc105|           26.217|
|     loc106|           27.201|
+-----------+-----------------+
only showing top 10 rows



### Saves dataframe

In [34]:
df_1.write.csv('./data/df1.csv')

                                                                                

In [35]:
! ls data/df1.csv

_SUCCESS
part-00000-ad92fee7-c79c-4887-ad74-6f41adfeca67-c000.csv
part-00001-ad92fee7-c79c-4887-ad74-6f41adfeca67-c000.csv
part-00002-ad92fee7-c79c-4887-ad74-6f41adfeca67-c000.csv
part-00003-ad92fee7-c79c-4887-ad74-6f41adfeca67-c000.csv


In [36]:
! head data/df1.csv/part-00000-ad92fee7-c79c-4887-ad74-6f41adfeca67-c000.csv

03/04/2019 19:48:06,loc0,29
03/04/2019 19:53:06,loc0,27
03/04/2019 19:58:06,loc0,28
03/04/2019 20:03:06,loc0,30
03/04/2019 20:08:06,loc0,27
03/04/2019 20:13:06,loc0,27
03/04/2019 20:18:06,loc0,27
03/04/2019 20:23:06,loc0,29
03/04/2019 20:28:06,loc0,32
03/04/2019 20:33:06,loc0,35
