<a href="https://colab.research.google.com/github/battineni/Loccasions/blob/master/PySpark_init_read_write.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Step 1 install Apache Spark 3.2.0 with Hadoop 3.2 from below link 
# Get download latest from https://spark.apache.org/downloads.html  and update the file name 
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
# Step 2
# Unzip and the compressed file:
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
# Next, we need to install and that is the findspark library. 
# It will locate Spark on the system and import it as a regular library.
!pip install -q findspark

In [None]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
print(os.environ['SPARK_HOME'])

/content/spark-3.2.0-bin-hadoop3.2


In [None]:
import findspark
findspark.init()
 
#only run after findspark,init()

# import pyspark
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#Create SparkSession which creates SparkContext.
#not required starting 3.x
#sc=spark.sparkContext
print(pyspark.__version__)


3.2.0


In [None]:
# to know the location where Spark is installed, use findspark.find()
findspark.find()

'/content/spark-3.2.0-bin-hadoop3.2'

In [None]:

# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.2.0


In [None]:
spark

In [None]:
sc

''

In [None]:
df = spark.sql('''Select 'Spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|Spark|
+-----+



# Read CSV

In [None]:
df =spark.read.csv(header=True, inferSchema=True, path="/content/sample_data/california_housing_test.csv")
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -118.36|   33.82|              28.0|       67.0|          15.0|      49.0|      11.0|       6.1359|          330000.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
+---------+--------+----

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# pySpark data cleansing exercise

**Partitions in Spark**

Partitioning means that the complete data is not present in a single place. It is divided into multiple chunks and these chunks are placed on different nodes.

If you have one partition, Spark will only have a parallelism of one, even if you have thousands of executors. Also, if you have many partitions but only one executor, Spark will still only have a parallelism of one because there is only one computation resource.

In Spark, the lower level APIs allow us to define the number of partitions.

Let’s take a simple example to understand how partitioning helps us to give faster results. We will create a list of 20 million random numbers between 10 to 1000 and will count the numbers greater than 200.

In [None]:
from random import randint 

# create a list of random numbers between 10 to 1000
my_large_list = [randint(10,1000) for x in range(0,20000000)]

# create one partition of the list  
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)

# check number of partitions
print(my_large_list_one_partition.getNumPartitions())
# >> 1

# filter numbers greater than equal to 200
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)

# code was run in a jupyter notebook 
# to calculate the time taken to execute the following command
%%time

# count the number of elements in filtered list
print(my_large_list_one_partition.count())


16165809


In [None]:
# create five partitions of the list
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)

# filter numbers greater than equal to 200
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)

%%timeit 
# count the number of elements in the filtered list
print(my_large_list_with_five_partition.count())


In [None]:
nums = list(range(0,1000001))
len(nums)


1000001

In [None]:
# to distribute a large file in to RDD 
nums_rdd=sc.parallelize(nums)
nums_rdd

ParallelCollectionRDD[30] at readRDDFromFile at PythonRDD.scala:274

In [None]:
# collet is very heavy operation so use take to show only 5 
nums_rdd.collect()

nums_rdd.take(5)


[0, 1, 2, 3, 4]

In [None]:
# apply a funchtion to all elements of RDD
Squared_nums_rdd= nums_rdd.map(lambda x: x**2)
Squared_nums_rdd.take(5)

[0, 1, 4, 9, 16]

In [None]:
# to print the pairs 
pairs = Squared_nums_rdd.map(lambda x: (x,len(str(x))))
pairs.take(25)

In [None]:
# filter function
even_digit_pairs = pairs.filter(lambda x: (x[1] % 2) == 0)
even_digit_pairs.take(25)

In [None]:
# to flip the list 
fliped_even_digit_pairs = even_digit_pairs.map(lambda x: (x[1],x[0]))
fliped_even_digit_pairs.take(15)

# Table of Contents


1.   Structure of Spark's Data Source API
        - Read API Structure 
        - Write API Structure  
2.   Apache Spark Data Sources you Should Know About
      
        - CSV, JSON, Parquet, ORC, Text, JDBC/ODBC Connections 



# DataFrameReader

```
   # spark.read.format("csv")
      .option("mode", "FAILFAST")
      .option("inferSchema", "true")
      .option("path", "path/to/file(s)")
      .schema(someSchema)
      .load()
```


# WritingData

```
    dataframe.write.format("csv")
      .option("mode", "OVERWRITE")
      .option("dateFormat", "yyyy-MM-dd")
      .option("path", "path/to/file(s)")
      .save()

```



In [None]:
# .format specified hpw the file needs to be written 
# .option is optional as Spark uses parquet by default
# .PartitionBy, .bucketBy, .sortBy are only used with file-based data sources and control the file structure
# otr layout at the destination

In [None]:
# https://www.analyticsvidhya.com/blog/2020/10/data-engineering-101-data-sources-apache-spark/?utm_source=blog&utm_medium=working-with-pyspark-on-google-colab-for-data-scientists

# Save Modes 

     -  append	Appends the output files to the list of files that already exist at that location
     -  overwrite	Will completely overwrite any data that already exists there
     -  errorIfExists	Throws an error and fails the write if data or files already exist at the specified location
     -  ignore	If data or files exist at the location, do nothing with the current DataFrame


dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()

In [None]:
# this will create spark dataframe
df = spark.read.csv(header=True, inferSchema=True,path="/content/train.csv")
df.take(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

### Show Column details

In [None]:
# Show column details , first step of exploratory data analysis aka EDA is to check schema of  dataframe
df.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



### Display Rows

In [None]:
#Functions
df.show(n=5, truncate=True, vertical=False) #like pandas head()
df.count() #Number of rows in DF
df.select("User_ID","Product_ID").show(5)
df.describe().show() #to look at Statistical
df.select("City_Category").distinct().show() # Distinct Values for Categorical columns


+-------------+
|City_Category|
+-------------+
|            B|
|            C|
|            A|
+-------------+



In [None]:
#df.groupBy("City_Category").avg().show()
df.groupBy("City_Category").sum("Purchase").show()

+-------------+-------------+
|City_Category|sum(Purchase)|
+-------------+-------------+
|            B|   2115533605|
|            C|   1663807476|
|            A|   1316471661|
+-------------+-------------+



In [None]:
df.groupBy("City_Category").count().show(5)

+-------------+------+
|City_Category| count|
+-------------+------+
|            B|231173|
|            C|171175|
|            A|147720|
+-------------+------+



In [None]:
from pyspark.sql import functions as F
# df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|            173638|            383247|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+

