# PySpark 101

PySpark is the Python API for Apache Spark. It enables you to process massive amounts of data in a relatively short time. 
This large-scale data processing in a distributed environment using Python. 




In [1]:
#The modules below allows us to create or support the spark enviroment in Python.  

import os
import sys
import findspark
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

#the module below calls in the garbage collector module python-Garbage collection helps
#Apache Spark, which heavily relies on the Java Virtual Machine (JVM) to execute its code. 
#This module gives tools to automatically free up memory that 
#is no longer being used by Spark, and it plays a critical role in
#ensuring that your Spark session runs smoothly and efficiently

import gc


In [None]:

#the module below creates the spark session
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 



## PySpark RDD 

Resilient Distributed Datasets **(RDD)** is one of the fundamental data structures in Spark. Used for handling both structured and unstructured data but lacks any schema. Compared to network and disc sharing, PySpark RDD speeds up in-memory data sharing by 10 to 100 times.

**Some Features:**
1.  In-Memory 
2. Lazy Evaluations 
3. Immutable and Read-only 
4. Cacheable or Persistence 
5. Partitioned 
6. Parallel 
7. Fault Tolerance 
8. Location Stickiness  


In [2]:
#RDD

# Create an RDD from a list of numbers
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

# Perform a transformation on the RDD
squared_rdd = rdd.map(lambda x: x * x)

# Collect the results into a list
result = squared_rdd.collect()

# Print the result
print(result)




[1, 4, 9, 16, 25]


In [None]:


data= [("Dog food", 25), ("Cat food", 30), ("Dog food",35),("Bear food",40)]

rdd2 =spark.sparkContext.parallelize(data)
rdd2.collect()
#spark.sparkContext._jvm.System.gc()

In [None]:
#Transformations-  operations that creates new RDD from an old one
#Tranformations does not take place until an action is called - Lazy operation

In [None]:
mapped_rdd = rdd2.map(lambda x: (x[0].upper(), x[1]))
result= mapped_rdd.collect()

print("TRANSFORMED DATA:", result)

In [None]:
#Filter transformation: Filter records where age is greater than 30

filtered_rdd= rdd2.filter(lambda x: x[1]>30)
filtered_rdd.collect()

#sum the cost of food bought for different animals

#ReduceByKey transformation: Calculate the total
reduced_rdd= rdd2.reduceByKey(lambda x,y: x+y)
reduced_rdd.collect()

# Sortby transform: Sort the RDD

sorted_rdd= reduced_rdd.sortBy(lambda x: x[1], ascending=False)
sorted_rdd.collect()

In [None]:

#Action
count = rdd2.count()
print("The total number of elements in the rdd", count)


#first action: Retrieve the first element of the RDD
first_element = rdd2.first()

print("First elements in the rdd", first_element)

taken_elements= rdd2.take(2)
print("First 2 elements in the rdd", taken_elements)

In [None]:
#import data

# read csv, all columns will be of type string
df = spark.read.option('header','true').csv('F:\.....\PySpark\heart.csv')
# tell pyspark the type of the columns - saves time on large dataset. there are other ways to do this, but that's
# my favorite

#lets check our data types, its different in spark
#Get All column names and it's types
for field in df.schema.fields:
    print(field.name +" , "+str(field.dataType))

#for one column
print(df.schema["Age"].dataType)

#Get All column names from DataFrame
print(df.columns)


schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
df = spark.read.csv('F:\......\PySpark\heart.csv', schema=schema, header=True)
#Get All column names and it's types
for field in df.schema.fields:
    print(field.name +" , "+str(field.dataType))

# let PySpark infer the schema
df = spark.read.csv('F:\.......\PySpark\heart.csv', inferSchema=True, header=True)
#Get All column names and it's types
for field in df.schema.fields:
    print(field.name +" , "+str(field.dataType))

# replace nulls with other value at reading time
df = spark.read.csv('F:\.......\PySpark\heart.csv', nullValue='NA')

#print Schema
df.printSchema()
df.show()
df.write.csv("F:/....../PySpark/heart_save")

In [None]:

# save data

df.write.csv('F:\.....\PySpark\heart_save.csv', header = True)

df.toPandas().to_csv("F:\......\PySpark\heart_save.csv")

## Spark Dataframe

In [None]:


data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

df.show(3)


df.collect()
# count number of rows
df.count()

df.take(2)
df.printSchema()

df.display()

In [None]:
#Combine DataFrames
# Create two DataFrames with sales data 
sales_data1 = [("apple", 3), ("banana", 5), ("orange", 2)] 
sales_data2 = [("apple", 4), ("banana", 3), ("orange", 6)] 

df1 = spark.createDataFrame(sales_data1, ["product", "quantity"]) 
df2 = spark.createDataFrame(sales_data2, ["product", "quantity"]) 

# Merge DataFrames using union 
merged_df = df1.union(df2) 

# Show the result 
merged_df.show()

# Merge DataFrames using unionByName 
merged_df = df1.unionByName(df2) 

# Show the result 
merged_df.show() 

In [None]:
#Add a column to a Spark DataFrame

df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

df.show()
from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

df1.show()


In [None]:
#Filter a Spark DataFrame

df1.where(col("life_stage").isin(["teenager", "adult"])).show()

#Group by aggregation on Spark DataFrame

from pyspark.sql.functions import avg

df1.select(avg("age")).show()
#You can also compute the average age for each life_stage:

df1.groupBy("life_stage").avg().show()


df.select("firstname","middlename").show(2)


# show parts of the table
df.select('Age').show(3)
df.select(['Age','Sex']).show(3)

In [None]:
#Run SQL queries in PySpark
# Registering a table
#df.registerTempTable("df")
df.createOrReplaceTempView("df")

query_df = spark.sql("select * from df").show(3)

#Run a query that returns the teenagers:

spark.sql("select * from df where first_name='sue'").show()

## Spark Machine Learning

In [None]:
#modules for machine learning in pyspark

from pyspark import SparkFiles
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
#mport data
data = spark.read.csv(SparkFiles.get("F:\........\PySpark\BostonHousing.csv"), header=True, inferSchema=True)
data.show(5)
data.printSchema()

In [None]:
#Assemble the data

assembler = VectorAssembler(
    inputCols=["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO",  "LSTAT"],
    outputCol="features")

data = assembler.transform(data)
final_data = data.select("features", "MEDV")

In [None]:

train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)
lr = LinearRegression(featuresCol="features", labelCol="MEDV", predictionCol="predicted_medv")
lr_model = lr.fit(train_data)



predictions = lr_model.transform(test_data)

In [None]:
#model evaulation metrics

evaluator = RegressionEvaluator(labelCol="MEDV", predictionCol="predicted_medv", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(labelCol="MEDV", predictionCol="predicted_medv", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))


In [None]:

#shut down spark session
spark.stop()

#problems with memory- clear cache
spark.catalog.clearCache()  
