In [3]:
#installing pyspark library to run spark in python

In [4]:
!pip install pyspark
#we already install this library

In [5]:
import pyspark

In [6]:
#Call the library to start the session
from pyspark.sql import SparkSession 

In [7]:
#create an instance of spark
spark_runing = SparkSession.builder.appName('testing').getOrCreate()

In [8]:
spark_runing

In [9]:
import pandas as pd
demo_csv = pd.read_csv('test1.csv')
demo_csv.head(2)

Unnamed: 0,Name,age,Experience,Salary
0,Krish,31,10,30000
1,Sudhanshu,30,8,25000


# read a dataset with spark

In [10]:
df_pyspark = spark_runing.read.csv('test1.csv')
#show a resume of what of its
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

With this command shows the whole table

In [11]:
#show the data
df_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [12]:
#une the title as title in the imported file
#option() = alows to consider the 1st row as header

df_pyspark_title = spark_runing.read.option('header','true').csv('test1.csv', inferSchema=True)

In [13]:
df_pyspark_title.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### type

In [14]:
#verifying the type in spark
type(df_pyspark_title)

pyspark.sql.dataframe.DataFrame

In [15]:
#verifying the type in pandas
type(demo_csv)

pandas.core.frame.DataFrame

In [16]:
#description of the dataset
#this prin only strings cause we dont assign a type when importing
df_pyspark_title.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



------------------------------------------------------------------------------------------------

# Pyspark Dataframe

* PySpark Dataframe
* Reading The Dataset
* Checking the Datatypes of the Column(Schema)
* Selecting Columns And Indexing
* Check Describe option similar to Pandas
* Adding Columns
* Dropping columns
* Renaming Columns

In [17]:
#inferSchema = allows to got the datatype from thefile 
df_pyspark_title_inferSchema = spark_runing.read.option('header','true').csv('test1.csv', inferSchema=True)

In [18]:
df_pyspark_title_inferSchema.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [19]:
#Returns all column names as a list.
df_pyspark_title_inferSchema.columns

['Name', 'age', 'Experience', 'Salary']

In [20]:
#Returns the first ``n`` rows.
df_pyspark_title_inferSchema.head(2)

[Row(Name='Krish', age=31, Experience=10, Salary=30000),
 Row(Name='Sudhanshu', age=30, Experience=8, Salary=25000)]

# selecting

In [21]:
#selecting one column
df_pyspark_title_inferSchema.select('name').show()

+---------+
|     name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
+---------+



selecting 2 columns

In [22]:
df_pyspark_title_inferSchema.select(['name','salary']).show()

+---------+------+
|     name|salary|
+---------+------+
|    Krish| 30000|
|Sudhanshu| 25000|
|    Sunny| 20000|
|     Paul| 20000|
|   Harsha| 15000|
|  Shubham| 18000|
+---------+------+



cheking the data types

In [23]:
#show the data types
df_pyspark_title_inferSchema.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [24]:
#show the description in stadistics
df_pyspark_title_inferSchema.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



# adding column in spark

In [25]:
df_pyspark_add_column = df_pyspark_title_inferSchema.withColumn('experience after 2 years', df_pyspark_title_inferSchema['Experience']+2)

In [26]:
df_pyspark_add_column.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



# dropping the column

In [27]:
df_pyspark2 = df_pyspark_add_column .drop('experience after 2 years')

In [28]:
df_pyspark2.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



# rename column

In [29]:
df_pyspark2.withColumnRenamed('name', 'xxx').show()

+---------+---+----------+------+
|      xxx|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



-----

# Pyspark Handling Missing Values

* Dropping Columns
* Dropping Rows
* Various Parameter In Dropping functionalities
* Handling Missing values by Mean, MEdian And Mode

In [32]:
#charge the table with missing values
df_missing_value = spark_runing.read.csv('test2.csv', header = True, inferSchema=True)

In [35]:
df_missing_value.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|       34|  10|     38000|  null|
|       36|null|      null|  null|
+---------+----+----------+------+



# Drop null values

In [36]:
#na = select all null values
df_missing_value.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [41]:
#dropping only valies that have all values null
#any == all in how
df_missing_value.na.drop(how="all").show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|       34|  10|     38000|  null|
|       36|null|      null|  null|
+---------+----+----------+------+



In [42]:
#treshhold
#only add using a number of null variables
df_missing_value.na.drop(how="all", thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|       34|  10|     38000|  null|
+---------+----+----------+------+



In [45]:
#subset
#drop any values from 
#subset = drop a values depending on a variable
df_missing_value.na.drop(how="any", subset="Experience").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|       34| 10|     38000|  null|
+---------+---+----------+------+



# filling missing values

In [48]:
df_filling_Va = df_missing_value.na.fill(0, subset=["salary", "age"])

In [51]:
df_filling_Va.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|      null| 40000|
|       34| 10|     38000|     0|
|       36|  0|      null|     0|
+---------+---+----------+------+



fill mising values using the
* Mean 
* Median
* Mode

In [62]:
#Imputation estimator for completing missing values, using the mean, median or mode
from pyspark.ml.feature import Imputer
addingvalues= Imputer(
    #Whis its were im getting the imputs
    inputCols=['age','Experience','Salary'],
    #this allows to create a new column
    outputCols= ["{}_imputed".format(c) 
        for c in ['age','Experience','Salary']]
    #setStrategy() = this calculate the mean
    ).setStrategy('mean')
    


In [59]:
#fit() = Fits a model to the input dataset with optional parameters.
     #fit(): transformer learns something about the data
#transform() = Concise syntax for chaining custom transformations.
    #transform(): it uses what it learned to do the data transformation
addingvalues.fit(df_missing_value).transform(df_missing_value).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         24|              5432|         40000|
|       34|  10|     38000|  null|         10|             38000|         24000|
|       36|null|      null|  null|         24|              5432|         24000|
+---------+----+----------+-

* CountVectorizer
    - fit learns the vocabulary
    - transform creates a document-term matrix using the vocabulary
* SimpleImputer
    * fit learns the value to impute
    * transform fills in missing entries using the imputation value
* StandardScaler
    * fit learns the mean and scale of each feature
    * transform standardizes the features using the mean and scale
* HashingVectorizer
    * fit is not used, and thus it is known as a "stateless" transformer
    * transform creates the document-term matrix using a hash of the token

# Diference between Fit() & Transform()

1.Fit(): Method calculates the parameters μ and σ and saves them as internal objects.

2.Transform(): Method using these calculated parameters apply the transformation to a particular dataset.

3.Fit_transform(): joins the fit() and transform() method for transformation of dataset.