In [1]:
import pyspark as spark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("Python Spark SQL basic example")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

22/12/08 15:10:41 WARN Utils: Your hostname, users-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.203 instead (on interface en0)
22/12/08 15:10:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/08 15:10:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Read CSV Data

In [2]:
# read a csv file
my_data = spark.read.csv('../data/ind-ban-comment.csv',header=True)

# see the default schema of the dataframe
my_data.printSchema()

root
 |-- Batsman: string (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: string (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Isball: string (nullable = true)
 |-- Isboundary: string (nullable = true)
 |-- Iswicket: string (nullable = true)
 |-- Over: string (nullable = true)
 |-- Runs: string (nullable = true)
 |-- Timestamp: string (nullable = true)



# Defining the Schema

In [4]:
import pyspark.sql.types as tp

# define the schema
my_schema = tp.StructType([
    tp.StructField(name= 'Batsman',      dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Batsman_Name', dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Bowler',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Bowler_Name',  dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Commentary',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Detail',       dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Dismissed',    dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Id',           dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Isball',       dataType= tp.BooleanType(),   nullable= True),
    tp.StructField(name= 'Isboundary',   dataType= tp.BinaryType(),   nullable= True),
    tp.StructField(name= 'Iswicket',     dataType= tp.BinaryType(),   nullable= True),
    tp.StructField(name= 'Over',         dataType= tp.DoubleType(),    nullable= True),
    tp.StructField(name= 'Runs',         dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Timestamp',    dataType= tp.TimestampType(), nullable= True)    
])

# read the data again with the defined schema
my_data = spark.read.csv('../data/ind-ban-comment.csv',schema= my_schema,header= True)

# print the schema
my_data.printSchema()


root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: binary (nullable = true)
 |-- Iswicket: binary (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



# Drop columns from the data

In [5]:
# drop the columns that are not required
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns

['Batsman_Name',
 'Bowler_Name',
 'Commentary',
 'Detail',
 'Dismissed',
 'Isball',
 'Isboundary',
 'Iswicket',
 'Over',
 'Runs',
 'Timestamp']

# Data Exploration using PySpark
- Check the Data Dimensions

In [6]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))
# >> (605, 11)

(605, 11)

In [7]:
# get the summary of the numerical columns
my_data.select('Isball', 'Isboundary', 'Runs').describe().show()

+-------+------------------+
|summary|              Runs|
+-------+------------------+
|  count|               605|
|   mean|0.9917355371900827|
| stddev| 1.342725481259329|
|    min|                 0|
|    max|                 6|
+-------+------------------+



In [14]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
# data_agg.show()

In [15]:
# value counts of Batsman_Name column
my_data.groupBy('Batsman_Name').count().show()

+------------------+-----+
|      Batsman_Name|count|
+------------------+-----+
|     Soumya Sarkar|   39|
|  Mashrafe Mortaza|    5|
|   Shakib Al Hasan|   75|
|   Mushfiqur Rahim|   23|
|Mohammad Saifuddin|   42|
|         Liton Das|   24|
|      Rishabh Pant|   43|
|    Mohammed Shami|    2|
|       Tamim Iqbal|   31|
|     Hardik Pandya|    2|
|          KL Rahul|   93|
| Bhuvneshwar Kumar|    4|
|     Rubel Hossain|   11|
|      Rohit Sharma|   94|
|    Dinesh Karthik|    9|
|       Virat Kohli|   27|
|          MS Dhoni|   33|
|     Sabbir Rahman|   40|
|  Mosaddek Hossain|    7|
| Mustafizur Rahman|    1|
+------------------+-----+



# Encode Categorical Variables using PySpark

In [21]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

# create object of StringIndexer class and specify input and output column
SI_batsman = StringIndexer(inputCol='Batsman_Name',outputCol='Batsman_Index')
SI_bowler = StringIndexer(inputCol='Bowler_Name',outputCol='Bowler_Index')

# transform the data
my_data = SI_batsman.fit(my_data).transform(my_data)
my_data = SI_bowler.fit(my_data).transform(my_data)

# view the transformed data
my_data.select('Batsman_Name', 'Batsman_Index', 'Bowler_Name', 'Bowler_Index').show(10)

ImportError: cannot import name 'OneHotEncoderEstimator' from 'pyspark.ml.feature' (/Users/user/TENAC/pyspark-ml-pipeline/env/lib/python3.9/site-packages/pyspark/ml/feature.py)