In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

import collections


In [2]:
# Create a SparkSession (Note, the config section is only for Windows!)
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()


In [3]:
def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), age=int(fields[2]), numFriends=int(fields[3]))

lines = spark.sparkContext.textFile("file:///home/hashimyousaf/spark-2.4.0-bin-hadoop2.7/bin/jupyter-scripts/dataset/friends/fakefriends.csv")
people = lines.map(mapper)
print(people.collect())

[Row(ID=0, age=33, name="b'Will'", numFriends=385), Row(ID=1, age=26, name="b'Jean-Luc'", numFriends=2), Row(ID=2, age=55, name="b'Hugh'", numFriends=221), Row(ID=3, age=40, name="b'Deanna'", numFriends=465), Row(ID=4, age=68, name="b'Quark'", numFriends=21), Row(ID=5, age=59, name="b'Weyoun'", numFriends=318), Row(ID=6, age=37, name="b'Gowron'", numFriends=220), Row(ID=7, age=54, name="b'Will'", numFriends=307), Row(ID=8, age=38, name="b'Jadzia'", numFriends=380), Row(ID=9, age=27, name="b'Hugh'", numFriends=181), Row(ID=10, age=53, name="b'Odo'", numFriends=191), Row(ID=11, age=57, name="b'Ben'", numFriends=372), Row(ID=12, age=54, name="b'Keiko'", numFriends=253), Row(ID=13, age=56, name="b'Jean-Luc'", numFriends=444), Row(ID=14, age=43, name="b'Hugh'", numFriends=49), Row(ID=15, age=36, name="b'Rom'", numFriends=49), Row(ID=16, age=22, name="b'Weyoun'", numFriends=323), Row(ID=17, age=35, name="b'Odo'", numFriends=13), Row(ID=18, age=45, name="b'Jean-Luc'", numFriends=455), Row(ID=

In [4]:

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people")

In [5]:
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

In [6]:
# The results of SQL queries are RDDs and support all the normal RDD operations.
for teen in teenagers.collect():
  print(teen)

Row(ID=21, age=19, name="b'Miles'", numFriends=268)
Row(ID=52, age=19, name="b'Beverly'", numFriends=269)
Row(ID=54, age=19, name="b'Brunt'", numFriends=5)
Row(ID=106, age=18, name="b'Beverly'", numFriends=499)
Row(ID=115, age=18, name="b'Dukat'", numFriends=397)
Row(ID=133, age=19, name="b'Quark'", numFriends=265)
Row(ID=136, age=19, name="b'Will'", numFriends=335)
Row(ID=225, age=19, name="b'Elim'", numFriends=106)
Row(ID=304, age=19, name="b'Will'", numFriends=404)
Row(ID=341, age=18, name="b'Data'", numFriends=326)
Row(ID=366, age=19, name="b'Keiko'", numFriends=119)
Row(ID=373, age=19, name="b'Quark'", numFriends=272)
Row(ID=377, age=18, name="b'Beverly'", numFriends=418)
Row(ID=404, age=18, name="b'Kasidy'", numFriends=24)
Row(ID=409, age=19, name="b'Nog'", numFriends=267)
Row(ID=439, age=18, name="b'Data'", numFriends=417)
Row(ID=444, age=18, name="b'Keiko'", numFriends=472)
Row(ID=492, age=19, name="b'Dukat'", numFriends=36)
Row(ID=494, age=18, name="b'Kasidy'", numFriends=194)

In [7]:
# We can also use functions instead of SQL queries:
schemaPeople.groupBy("age").count().orderBy("age").show()

spark.stop()

+---+-----+
|age|count|
+---+-----+
| 18|    8|
| 19|   11|
| 20|    5|
| 21|    8|
| 22|    7|
| 23|   10|
| 24|    5|
| 25|   11|
| 26|   17|
| 27|    8|
| 28|   10|
| 29|   12|
| 30|   11|
| 31|    8|
| 32|   11|
| 33|   12|
| 34|    6|
| 35|    8|
| 36|   10|
| 37|    9|
+---+-----+
only showing top 20 rows

