In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845518 sha256=4ddd9b29d68c88357baa59429f6b4edc5fb322f79e90845062b9d69c56db0f51
  Stored in directory: c:\users\tan\appdata\local\pip\cache\wheels\43\dc\11\ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()

In [4]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [5]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [11]:
data = range(1,30)

In [19]:
data[0]

1

In [17]:
#create an RDD
xrangeRDD = sc.parallelize(data, 4)


In [18]:
# this will let us know that we created an RDD
xrangeRDD

PythonRDD[1] at RDD at PythonRDD.scala:53

In [20]:
#Create an Sub Resilient Distributed Dataset
subRDD = xrangeRDD.map(lambda x: x-1)

In [23]:
filteredRDD = subRDD.filter(lambda x : x<10)

In [24]:

print("Before transformation")
print(xrangeRDD.collect())


print("After Transformation")
print(subRDD.collect())

print("Apply a filter less than 10")
print(filteredRDD.collect())

Before transformation
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
After Transformation
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
Apply a filter less than 10
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


## RDD with pandas dataframe

In [25]:
import pandas as pd

In [26]:
path = r"C:\Jeison\Python\Spark\first"

In [29]:
db = pd.read_excel(path + "\data.xlsx")

In [30]:
db.head()

Unnamed: 0,Name,Age
0,Jeison,10
1,Jonathan,20
2,Jacqueline,30


In [31]:
SparkRDD = sc.parallelize(db,4)

In [32]:
SparkRDD

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274

In [33]:
sub_SparkRDD = SparkRDD.map(lambda x : x*2)

In [34]:
print(sub_SparkRDD.collect())

['NameName', 'AgeAge']


# Datafraes and SparkSQL

In [38]:
spark

In [39]:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100    73  100    73    0     0     78      0 --:--:-- --:--:-- --:--:--    78


In [40]:
#create an spark dataframe
df = spark.read.json("people.json").cache()

In [42]:
#print DataFarme 
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [43]:
#print Schema
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [44]:
# Register the DataFrame as a SQL temporary view
df.createTempView("people")

In [45]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [46]:
spark.sql("SELECT name FROM people").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



FILTERING BY SPECIFIC VALUES

In [47]:
df.filter(df["age"] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [50]:
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [51]:
# Perfom basic aggregation of data

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    0|
|  30|    1|
+----+-----+

