In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pandas

In [None]:
import findspark
findspark.init()

In [None]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
spark

In [None]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [None]:
# Preview a few records
mtcars.head()

In [None]:
# We use the `createDataFrame` function to load the data into a spark dataframe
sdf = spark.createDataFrame(mtcars) 

In [None]:
# Let us look at the schema of the loaded spark dataframe
sdf.printSchema()

In [None]:
sdf.show(5)

In [None]:
sdf.select('mpg').show(5)

In [None]:
sdf.filter(sdf['mpg'] < 18).show(5)

In [None]:
sdf.withColumn('wtTon', sdf['wt'] * 0.45).show(5)

In [None]:
sdf_new = sdf.withColumnRenamed("vs", "versus")

In [None]:
sdf.where(sdf['mpg'] < 18).show(3) 

In [None]:
# define sample DataFrame 1 

data = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")] 

columns = ["emp_id", "emp_name"] 

dataframe_1 = spark.createDataFrame(data, columns) 

In [None]:
# define sample DataFrame 2 

data = [("A101", 1000), ("A102", 2000), ("A103", 3000)]

columns = ["emp_id", "salary"]

dataframe_2 = spark.createDataFrame(data, columns)

In [None]:
# create a new DataFrame, "combined_df" by performing an inner join

combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")

In [None]:
# define sample DataFrame 1

data = [("A101", 1000), ("A102", 2000), ("A103",None)]

columns = ["emp_id", "salary"]

dataframe_1 = spark.createDataFrame(data, columns)

In [None]:
# fill missing salary value with a specified value 

filled_df = dataframe_1.fillna({"salary": 3000}) 
filled_df.head(3)

In [None]:
sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)

In [None]:
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)


# Introduction to SparkSQL

In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
spark

In [None]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')

In [None]:
# Preview a few records
mtcars.head()

In [None]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

In [None]:
sdf = spark.createDataFrame(mtcars) 

In [None]:
sdf.printSchema()

In [None]:
sdf_new = sdf.withColumnRenamed("vs", "versus")

In [None]:
sdf_new.head(5)

In [None]:
sdf.createTempView("cars")

In [None]:
# Showing the whole table
spark.sql("SELECT * FROM cars").show()

In [None]:
# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)

In [None]:
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()

# Connecting to spark cluster

In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

In [None]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

# import SparkSession
from pyspark.sql import SparkSession

In [None]:
#Create SparkSession
#Here 'Getting Started with Spark' is the application name
#Ignore any warnings by SparkSession command

spark = SparkSession.builder.appName("Getting Started with Spark").getOrCreate()

In [None]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0231EN-SkillsNetwork/datasets/mpg.csv


In [None]:
# using the spark.read.csv function we load the data into a dataframe.
# the header = True mentions that there is a header row in out csv file
# the inferSchema = True, tells spark to automatically find out the data types of the columns.

# Load mpg dataset
mpg_data = spark.read.csv("mpg.csv", header=True, inferSchema=True)


In [None]:
mpg_data.printSchema()

In [None]:
# show top 5 rows from the dataset
mpg_data.head(5)

In [None]:
spark.stop()