In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.config("spark.some.config.option", "some-value").getOrCreate()

22/12/29 20:18:01 WARN Utils: Your hostname, RUBIN resolves to a loopback address: 127.0.1.1; using 172.21.87.182 instead (on interface eth0)
22/12/29 20:18:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/29 20:18:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/29 20:18:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.json("people.json")

                                                                                

In [4]:
df

DataFrame[age: bigint, name: string]

In [11]:
df.head(10)

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [12]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [13]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [15]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [23]:
df.select(["name", df['age'] + 1]).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [25]:
df2 = df.select(["name", df['age'] + 1])

In [26]:
df2.show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [29]:
df[df["age"] > 25].show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [32]:
df.filter(df["age"] > 15).show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [38]:
df[df["age"].isNull()].show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
+----+-------+



In [40]:
df.filter(df['age'].isNotNull()).show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



In [42]:
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



# Registering DataFrame as SQL temporary view

In [43]:
df.createOrReplaceTempView("people")

In [44]:
sqlDF = spark.sql("Select * from people")

In [45]:
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [48]:
spark.sql("Select * from people where age > 15").show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



# SparkContext RDD

In [54]:
sc = spark.sparkContext

In [55]:
lines = sc.textFile("people.txt")

In [56]:
lines.top(3)

                                                                                

['Michael, 29', 'Justin, 19', 'Andy, 30']

In [57]:
lines.take(2)

['Michael, 29', 'Andy, 30']

In [59]:
parts = lines.map(lambda x : x.split(","))

In [60]:
parts.top(3)

[['Michael', ' 29'], ['Justin', ' 19'], ['Andy', ' 30']]

In [64]:
parts_tuple = parts.map(lambda x : (x[0], x[1].strip()))

In [66]:
parts_tuple.collect()

[('Michael', '29'), ('Andy', '30'), ('Justin', '19')]

# Programmatic_schema_example

In [67]:
sc = spark.sparkContext

In [75]:
lines = sc.textFile("people.txt")
parts = lines.map(lambda x : x.split(","))
split_parts = parts.map(lambda p : (p[0], p[1].strip()))

In [77]:
parts.collect()

[['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]

In [76]:
split_parts.collect()

[('Michael', '29'), ('Andy', '30'), ('Justin', '19')]

# Defining a schema

In [83]:
 schema = StructType([
     StructField('name', StringType(), True),
     StructField('age', StringType(), True)
 ])

In [79]:
# applying schema to RDD

In [84]:
schemaPeople = spark.createDataFrame(split_parts, schema)

In [85]:
schemaPeople.show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [86]:
# creating temporary Table/view

In [87]:
schemaPeople.createOrReplaceTempView("people")

In [94]:
spark.sql("select * from people where age >= 19").show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [95]:
spark.stop()