In [1]:
!pip install pyspark



In [2]:
import os, sys
from pyspark.sql import SparkSession

In [3]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [13]:
data = [
    ("John", "CS", 78),
    ("Sam", "EC", 90),
    ("Max", "CS", 65),
    ("Nick", "CS", 45),
    ("Adam", "EC", 91),
    ("Shawn", "ME", 88),
    ("John", "ME", 37),
    ("Steve", "CS", 63),
]

In [5]:
spark = SparkSession.builder.master("local[1]").appName("Example").getOrCreate()

In [6]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x00000281645B5DF0>


In [14]:
# creating RDD
rdd = spark.sparkContext.parallelize(data)

In [15]:
df = rdd.toDF()

In [16]:
df

DataFrame[_1: string, _2: string, _3: bigint]

In [17]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [18]:
df.show()

+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
| John| CS| 78|
|  Sam| EC| 90|
|  Max| CS| 65|
| Nick| CS| 45|
| Adam| EC| 91|
|Shawn| ME| 88|
| John| ME| 37|
|Steve| CS| 63|
+-----+---+---+



In [21]:
columnName = ['Name', 'Branch', 'Marks']
df_2 = rdd.toDF(columnName)

In [22]:
df_2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- Marks: long (nullable = true)



In [23]:
df_2.show()

+-----+------+-----+
| Name|Branch|Marks|
+-----+------+-----+
| John|    CS|   78|
|  Sam|    EC|   90|
|  Max|    CS|   65|
| Nick|    CS|   45|
| Adam|    EC|   91|
|Shawn|    ME|   88|
| John|    ME|   37|
|Steve|    CS|   63|
+-----+------+-----+



In [24]:
# another way to create dataframe
df_3 = spark.createDataFrame(rdd).toDF(*columnName)

In [25]:
df_3.show()

+-----+------+-----+
| Name|Branch|Marks|
+-----+------+-----+
| John|    CS|   78|
|  Sam|    EC|   90|
|  Max|    CS|   65|
| Nick|    CS|   45|
| Adam|    EC|   91|
|Shawn|    ME|   88|
| John|    ME|   37|
|Steve|    CS|   63|
+-----+------+-----+



In [26]:
# create schema for data frame

In [27]:
from pyspark.sql.types import StructType, StringType, IntegerType, StructField

In [31]:
schema = StructType([
    StructField("StudentName", StringType()),
    StructField("StudentBranch", StringType()),
    StructField("StudentMarks", StringType())
])

In [32]:
df_4 = spark.createDataFrame(data=data, schema=schema)

In [33]:
df_4.show()

+-----------+-------------+------------+
|StudentName|StudentBranch|StudentMarks|
+-----------+-------------+------------+
|       John|           CS|          78|
|        Sam|           EC|          90|
|        Max|           CS|          65|
|       Nick|           CS|          45|
|       Adam|           EC|          91|
|      Shawn|           ME|          88|
|       John|           ME|          37|
|      Steve|           CS|          63|
+-----------+-------------+------------+



In [34]:
summer_df = spark.read.csv("summer.csv", header=True)

In [35]:
summer_df.show(5)

+----+------+--------+----------+-----------------+-------+------+--------------------+------+
|Year|  City|   Sport|Discipline|          Athlete|Country|Gender|               Event| Medal|
+----+------+--------+----------+-----------------+-------+------+--------------------+------+
|1896|Athens|Aquatics|  Swimming|     HAJOS Alfred|    HUN|   Men|      100M Freestyle|  Gold|
|1896|Athens|Aquatics|  Swimming|  HERSCHMANN Otto|    AUT|   Men|      100M Freestyle|Silver|
|1896|Athens|Aquatics|  Swimming| DRIVAS Dimitrios|    GRE|   Men|100M Freestyle Fo...|Bronze|
|1896|Athens|Aquatics|  Swimming|MALOKINIS Ioannis|    GRE|   Men|100M Freestyle Fo...|  Gold|
|1896|Athens|Aquatics|  Swimming|CHASAPIS Spiridon|    GRE|   Men|100M Freestyle Fo...|Silver|
+----+------+--------+----------+-----------------+-------+------+--------------------+------+
only showing top 5 rows



In [36]:
# number of rows in data
summer_df.count()

31165

In [37]:
# number of cols in data
len(summer_df.dtypes)

9

In [39]:
# convert spark rdd to pandas data frame
summer_df.limit(5).toPandas()

Unnamed: 0,Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
0,1896,Athens,Aquatics,Swimming,HAJOS Alfred,HUN,Men,100M Freestyle,Gold
1,1896,Athens,Aquatics,Swimming,HERSCHMANN Otto,AUT,Men,100M Freestyle,Silver
2,1896,Athens,Aquatics,Swimming,DRIVAS Dimitrios,GRE,Men,100M Freestyle For Sailors,Bronze
3,1896,Athens,Aquatics,Swimming,MALOKINIS Ioannis,GRE,Men,100M Freestyle For Sailors,Gold
4,1896,Athens,Aquatics,Swimming,CHASAPIS Spiridon,GRE,Men,100M Freestyle For Sailors,Silver


In [40]:
summer_df.select("Year", "Athlete", "Country").show(5)

+----+-----------------+-------+
|Year|          Athlete|Country|
+----+-----------------+-------+
|1896|     HAJOS Alfred|    HUN|
|1896|  HERSCHMANN Otto|    AUT|
|1896| DRIVAS Dimitrios|    GRE|
|1896|MALOKINIS Ioannis|    GRE|
|1896|CHASAPIS Spiridon|    GRE|
+----+-----------------+-------+
only showing top 5 rows



In [41]:
# will return 3,4 and 5 column
summer_df.select(summer_df.columns[3:6]).show(5)

+----------+-----------------+-------+
|Discipline|          Athlete|Country|
+----------+-----------------+-------+
|  Swimming|     HAJOS Alfred|    HUN|
|  Swimming|  HERSCHMANN Otto|    AUT|
|  Swimming| DRIVAS Dimitrios|    GRE|
|  Swimming|MALOKINIS Ioannis|    GRE|
|  Swimming|CHASAPIS Spiridon|    GRE|
+----------+-----------------+-------+
only showing top 5 rows



In [42]:
# how to alter columns

In [43]:
from pyspark.sql.functions import col, lit

In [45]:
df_3.show(3)

+----+------+-----+
|Name|Branch|Marks|
+----+------+-----+
|John|    CS|   78|
| Sam|    EC|   90|
| Max|    CS|   65|
+----+------+-----+
only showing top 3 rows



In [50]:
df_3.withColumn("Percentage", col("Marks")/100).show()

+-----+------+-----+----------+
| Name|Branch|Marks|Percentage|
+-----+------+-----+----------+
| John|    CS|   78|      0.78|
|  Sam|    EC|   90|       0.9|
|  Max|    CS|   65|      0.65|
| Nick|    CS|   45|      0.45|
| Adam|    EC|   91|      0.91|
|Shawn|    ME|   88|      0.88|
| John|    ME|   37|      0.37|
|Steve|    CS|   63|      0.63|
+-----+------+-----+----------+



In [51]:
df_3.withColumn("College", lit("Christ University")).show(5)

+----+------+-----+-----------------+
|Name|Branch|Marks|          College|
+----+------+-----+-----------------+
|John|    CS|   78|Christ University|
| Sam|    EC|   90|Christ University|
| Max|    CS|   65|Christ University|
|Nick|    CS|   45|Christ University|
|Adam|    EC|   91|Christ University|
+----+------+-----+-----------------+
only showing top 5 rows



In [52]:
# filter data
df_3.filter(col("Branch") == "CS").show()

+-----+------+-----+
| Name|Branch|Marks|
+-----+------+-----+
| John|    CS|   78|
|  Max|    CS|   65|
| Nick|    CS|   45|
|Steve|    CS|   63|
+-----+------+-----+



In [55]:
# Not Equals to CS
df_3.filter("Branch <> 'CS'").show()

+-----+------+-----+
| Name|Branch|Marks|
+-----+------+-----+
|  Sam|    EC|   90|
| Adam|    EC|   91|
|Shawn|    ME|   88|
| John|    ME|   37|
+-----+------+-----+



In [57]:
df_3.createOrReplaceTempView("DataSet")
spark.sql("select Name, Branch, Marks from DataSet where Branch != 'CS'").show()

+-----+------+-----+
| Name|Branch|Marks|
+-----+------+-----+
|  Sam|    EC|   90|
| Adam|    EC|   91|
|Shawn|    ME|   88|
| John|    ME|   37|
+-----+------+-----+



In [58]:
df_3.groupBy("Branch").sum("Marks").show()

+------+----------+
|Branch|sum(Marks)|
+------+----------+
|    EC|       181|
|    ME|       125|
|    CS|       251|
+------+----------+

