In [2]:
!pip install pyspark py4j


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=ace9d0cf81448714eef921f2e8c1f93ff83a317eeae2c26364bb85329da0bd05
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
import pyspark
from pyspark import SparkContext, SparkConf

# setting spark context
conf = SparkConf().setAppName("tutorial")
sc = SparkContext(conf = conf)

In [None]:
rdd = sc.textFile("/content/sample_data/employee.txt")

In [None]:
type(rdd)

pyspark.rdd.RDD

In [None]:
rdd.top(2)

['Emp_id,Last_name,designation,job_id,hire_date,base_salary,commission,increment_pct',
 '7975,KUNAL,SALESMAN,7698,15-Sep-82,1300,250,20']

In [None]:
header = rdd.first()

In [None]:
type(header)

str

In [None]:
rdd = rdd.filter(lambda line : line != header)
rdd.top(2)

['7975,KUNAL,SALESMAN,7698,15-Sep-82,1300,250,20',
 '7934,MILLER,CLERK,7782,23-Jan-82,1300,\\000,10']

In [None]:
designation = rdd.map(lambda line : line.split(",")[2])



# SPARK SQL



In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [6]:
# create a spark session. you can create the config for your spark session here.
# Right now leaving the config as such

spark = SparkSession.builder.appName("tutorial").config("spark.some.config.option", "some-value").getOrCreate()

In [7]:
# read a json file
df = spark.read.json("/content/sample_data/people.json")

In [8]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [10]:
# selecting item from spark dataframe
df_name = df.select("name","age").show()

+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+



In [11]:
# some more select features
df_test = df.select("name", df["age"] +1)
df_test.show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [12]:
# filter
df_filter = df.filter(df["age"] > 18)
df_filter.show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [13]:
# count
df_count = df.groupBy(df["age"]).count()
df_count.show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



In [16]:
df.show()
df_filtered = df[df["name"] == "Michael"]
df_filtered.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



# Registering the dataframe as sql temporary View
This will create a view for your table/data

In [18]:
df.createOrReplaceTempView("people")

In [22]:
sqlDF = spark.sql("select * from people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# creating spark context in new way

In [23]:
sc = spark.sparkContext

In [24]:
lines = sc.textFile("/content/sample_data/people.json")

In [30]:
parts = lines.map(lambda item : item.split(","))
parts.top(3)
# people = parts.map(lambda item : (item[0], item[1].strip()))
# people.top(2)

[['{"name":"Michael"}'],
 ['{"name":"Justin"', ' "age":19}'],
 ['{"name":"Andy"', ' "age":30}']]

# How to define schema by your own in pyspark

In [31]:
schema_list = [ StructField("name", StringType(), True), StructField("age", StringType(), True)]
schema = StructType(schema_list)

In [34]:
sc.stop()

In [35]:
spark.stop()