## **Setup**

In [1]:
!pip install pyspark
!pip install findspark
!pip install pandas

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=dce2766ff7c416bfab2299388d5816f6cc771558160fcdb4d8070d1fbd186239
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()

In [4]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# **1 - Spark session**

# Task 1: Creating the spark session and context

In [6]:
# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Task 2: Initialize Spark session

In [7]:
spark

# **2 - Load the data and Spark dataframe**

# Loading data into a Pandas DataFrame

In [10]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [11]:
#preview a few records
mtcars.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


# Loading data into a Spark DataFrame

In [12]:
sdf = spark.createDataFrame(mtcars)

In [13]:
sdf.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)



# **3: Basic data analysis and manipulation**

# Displays the content of the DataFrame

In [14]:
sdf.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|       Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [15]:
sdf.select('mpg').show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows



In [16]:
sdf.filter(sdf['mpg']<18).show(5)

+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
|  Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
| Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|  0|  0|   3|   3|
| Merc 450SL|17.3|  8|275.8|180|3.07|3.73| 17.6|  0|  0|   3|   3|
|Merc 450SLC|15.2|  8|275.8|180|3.07|3.78| 18.0|  0|  0|   3|   3|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 5 rows



basic arithmetic functions to convert the weight values from lb to metric ton. We create a new column called wtTon that has the weight from the wt column converted to metric tons.

In [17]:
sdf.withColumn('wton', sdf['wt'] * 0.45).show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|       Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|   wton|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|  1.179|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|1.29375|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|  1.044|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|1.44675|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|  1.548|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
only showing top 5 rows



Rename the existing column name "vs" to "versus" and assign the new result DataFrame to a variable, "sdf_new".

In [18]:
sdf_new = sdf.withColumnRenamed("vs","versus")

Filter the records based on the condition

In [19]:
sdf.where(sdf['mpg'] < 18).show(3)

+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
|Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
| Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
|Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|  0|  0|   3|   3|
+----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 3 rows



Combining DataFrames based on a specific condition.

In [20]:
# define sample DataFrame 1

data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")]

columns = ["emp_id", "emp_name"]

dataframe_1 = spark.createDataFrame(data, columns)

In [21]:
# define sample DataFrame 2

data = [("A101", 1000), ("A102", 2000), ("A103", 3000)]

columns = ["emp_id", "salary"]

dataframe_2 = spark.createDataFrame(data, columns)

In [22]:
# create a new DataFrame, "combined_df" by performing an inner join

combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")

Filling the missing values

In [23]:
# define sample DataFrame 1

data = [("A101", 1000), ("A102", 2000), ("A103",None)]

columns = ["emp_id", "salary"]

dataframe_1 = spark.createDataFrame(data, columns)

In [24]:
# fill missing salary value with a specified value

filled_df = dataframe_1.fillna({"salary": 3000})
filled_df.head(3)

[Row(emp_id='A101', salary=1000),
 Row(emp_id='A102', salary=2000),
 Row(emp_id='A103', salary=3000)]

# **4: Grouping and Aggregation**

In [25]:
sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)

+---+-----------------+
|cyl|          avg(wt)|
+---+-----------------+
|  6|3.117142857142857|
|  8|3.999214285714286|
|  4|2.285727272727273|
+---+-----------------+



In [26]:
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)


+---+---------+
|cyl|count(wt)|
+---+---------+
|  8|       14|
|  4|       11|
|  6|        7|
+---+---------+



In [28]:
spark.stop()
