In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), \
               age=int(fields[2]), numFriends=int(fields[3]))

lines = spark.sparkContext.textFile(r"Datasets\fakefriends.csv")
people = lines.map(mapper) # Create an RDD with Row object. Because the csv file does not have a structured data, we need to
                            # manually create Row object
print(lines.take(5))
print(people.take(5))
spark.stop()

['0,Will,33,385', '1,Jean-Luc,26,2', '2,Hugh,55,221', '3,Deanna,40,465', '4,Quark,68,21']
[Row(ID=0, name="b'Will'", age=33, numFriends=385), Row(ID=1, name="b'Jean-Luc'", age=26, numFriends=2), Row(ID=2, name="b'Hugh'", age=55, numFriends=221), Row(ID=3, name="b'Deanna'", age=40, numFriends=465), Row(ID=4, name="b'Quark'", age=68, numFriends=21)]


Notice that each line becomes a Row object. Now let's convert *people* into a DataFrame.

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), \
               age=int(fields[2]), numFriends=int(fields[3]))

lines = spark.sparkContext.textFile(r"C:Datasets\fakefriends.csv")
people = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

teenagers.show()
# The results of SQL queries are RDDs and support all the normal RDD operations.
for teen in teenagers.collect():
    print(teen)

# We can also use functions instead of SQL queries:
schemaPeople.groupBy("age").count().orderBy("age").show()

spark.stop()

+---+----------+---+----------+
| ID|      name|age|numFriends|
+---+----------+---+----------+
| 21|  b'Miles'| 19|       268|
| 52|b'Beverly'| 19|       269|
| 54|  b'Brunt'| 19|         5|
|106|b'Beverly'| 18|       499|
|115|  b'Dukat'| 18|       397|
|133|  b'Quark'| 19|       265|
|136|   b'Will'| 19|       335|
|225|   b'Elim'| 19|       106|
|304|   b'Will'| 19|       404|
|341|   b'Data'| 18|       326|
|366|  b'Keiko'| 19|       119|
|373|  b'Quark'| 19|       272|
|377|b'Beverly'| 18|       418|
|404| b'Kasidy'| 18|        24|
|409|    b'Nog'| 19|       267|
|439|   b'Data'| 18|       417|
|444|  b'Keiko'| 18|       472|
|492|  b'Dukat'| 19|        36|
|494| b'Kasidy'| 18|       194|
+---+----------+---+----------+

Row(ID=21, name="b'Miles'", age=19, numFriends=268)
Row(ID=52, name="b'Beverly'", age=19, numFriends=269)
Row(ID=54, name="b'Brunt'", age=19, numFriends=5)
Row(ID=106, name="b'Beverly'", age=18, numFriends=499)
Row(ID=115, name="b'Dukat'", age=18, numFriends=397)

Now, let's work with a csv file with structured data (i.e. with headers). In this case, we can easily create a DataFrame from the csv file.

In [9]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

people = spark.read.option("header", "true").option("inferSchema", "true").csv(r"Datasets\fakefriends-header.csv")


print("Here is our inferred schema:")
people.printSchema()

print("Let's display the name column:")
people.select("name").show()

print("Filter out anyone over 21:")
people.filter(people.age < 21).show()

print("Group by age:")
people.groupBy("age").count().show()

print("Make everyone 10 years older:")
people.select(people.name, people.age+10.0).show()

spark.stop()

Here is our inferred schema:
root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)

Let's display the name column:
+--------+
|    name|
+--------+
|    Will|
|Jean-Luc|
|    Hugh|
|  Deanna|
|   Quark|
|  Weyoun|
|  Gowron|
|    Will|
|  Jadzia|
|    Hugh|
|     Odo|
|     Ben|
|   Keiko|
|Jean-Luc|
|    Hugh|
|     Rom|
|  Weyoun|
|     Odo|
|Jean-Luc|
|  Geordi|
+--------+
only showing top 20 rows

Filter out anyone over 21:
+------+-------+---+-------+
|userID|   name|age|friends|
+------+-------+---+-------+
|    21|  Miles| 19|    268|
|    48|    Nog| 20|      1|
|    52|Beverly| 19|    269|
|    54|  Brunt| 19|      5|
|    60| Geordi| 20|    100|
|    73|  Brunt| 20|    384|
|   106|Beverly| 18|    499|
|   115|  Dukat| 18|    397|
|   133|  Quark| 19|    265|
|   136|   Will| 19|    335|
|   225|   Elim| 19|    106|
|   304|   Will| 19|    404|
|   327| Julian| 20|     63|
| 

**InferSchemca** means to determine the data type automatically based on the values given.

Let's now try to find the average number of friends by age.

In [12]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

people = spark.read.option("header", "true").option("inferSchema", "true").csv(r"Datasets\fakefriends-header.csv")
people.createOrReplaceTempView("people")

average_friends_by_age = spark.sql("SELECT age, AVG(friends) FROM people GROUP BY age ORDER BY age")
average_friends_by_age.show()

spark.stop()

+---+------------------+
|age|      avg(friends)|
+---+------------------+
| 18|           343.375|
| 19|213.27272727272728|
| 20|             165.0|
| 21|           350.875|
| 22|206.42857142857142|
| 23|             246.3|
| 24|             233.8|
| 25|197.45454545454547|
| 26|242.05882352941177|
| 27|           228.125|
| 28|             209.1|
| 29|215.91666666666666|
| 30| 235.8181818181818|
| 31|            267.25|
| 32| 207.9090909090909|
| 33| 325.3333333333333|
| 34|             245.5|
| 35|           211.625|
| 36|             246.6|
| 37|249.33333333333334|
+---+------------------+
only showing top 20 rows

