In [1]:
import findspark
findspark.init()

# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")


data = range(1,30)
# print first element of iterator
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)

# this will let us know that we created an RDD
xrangeRDD


subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)

print(filteredRDD.collect())
filteredRDD.count()

SparkSession is active and ready to use.
1
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

In [2]:
import time

test = sc.parallelize(range(1, 50000), 4)
test.cache()

t1 = time.time()
count1 = test.count()
dt1 = time.time() - t1
print('dt1:', dt1)

t2 = time.time()
count2 = test.count()
dt2 = time.time() - t2
print('dt2:', dt2)


dt1: 8.247710466384888
dt2: 4.017469882965088


In [3]:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
100    73  100    73    0     0     34      0  0:00:02  0:00:02 --:--:--    34
100    73  100    73    0     0     34      0  0:00:02  0:00:02 --:--:--    34


In [4]:
df = spark.read.json('people.json').cache()

In [5]:
df.show()
df.printSchema()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df.createTempView('people')

In [7]:
df.select('name').show()
df.select(df['name']).show()

spark.sql("Select name from people").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|   Andy|
| Justin|
+-------+



In [8]:
df.filter(df['age']> 21).show()

spark.sql('select * from people where age > 21').show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
| 30|Andy|
+---+----+



In [9]:
# Perfom basic aggregation of data

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

+----+-----+
| age|count|
+----+-----+
|  19|    2|
|null|    2|
|  30|    2|
+----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    2|
|null|    0|
|  30|    2|
+----+-----+



In [10]:
df.schema

StructType([StructField('age', LongType(), True), StructField('name', StringType(), True)])