## In this first example, we are creating a dataframe from an RDD, because the csv does not have headers

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()


In [3]:

#doing it this way because the data is not structured nicely 
def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), \
               age=int(fields[2]), numFriends=int(fields[3]))

lines = spark.sparkContext.textFile("fakefriends.csv")
people = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people).cache()

#create view turns the data into a queriably table 
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
for teen in teenagers.collect():
  print(teen)

Row(ID=21, name="b'Miles'", age=19, numFriends=268)
Row(ID=52, name="b'Beverly'", age=19, numFriends=269)
Row(ID=54, name="b'Brunt'", age=19, numFriends=5)
Row(ID=106, name="b'Beverly'", age=18, numFriends=499)
Row(ID=115, name="b'Dukat'", age=18, numFriends=397)
Row(ID=133, name="b'Quark'", age=19, numFriends=265)
Row(ID=136, name="b'Will'", age=19, numFriends=335)
Row(ID=225, name="b'Elim'", age=19, numFriends=106)
Row(ID=304, name="b'Will'", age=19, numFriends=404)
Row(ID=341, name="b'Data'", age=18, numFriends=326)
Row(ID=366, name="b'Keiko'", age=19, numFriends=119)
Row(ID=373, name="b'Quark'", age=19, numFriends=272)
Row(ID=377, name="b'Beverly'", age=18, numFriends=418)
Row(ID=404, name="b'Kasidy'", age=18, numFriends=24)
Row(ID=409, name="b'Nog'", age=19, numFriends=267)
Row(ID=439, name="b'Data'", age=18, numFriends=417)
Row(ID=444, name="b'Keiko'", age=18, numFriends=472)
Row(ID=492, name="b'Dukat'", age=19, numFriends=36)
Row(ID=494, name="b'Kasidy'", age=18, numFriends=194)

In [9]:
# We can also use functions instead of SQL queries:
schemaPeople.groupBy("age").count().orderBy("age").show()

+---+-----+
|age|count|
+---+-----+
| 18|    8|
| 19|   11|
| 20|    5|
| 21|    8|
| 22|    7|
| 23|   10|
| 24|    5|
| 25|   11|
| 26|   17|
| 27|    8|
| 28|   10|
| 29|   12|
| 30|   11|
| 31|    8|
| 32|   11|
| 33|   12|
| 34|    6|
| 35|    8|
| 36|   10|
| 37|    9|
+---+-----+
only showing top 20 rows



In [14]:
spark.stop()

## create a dataframe directly 

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [17]:
#infer schema tells spark to try and figure out the data type 
people = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("fakefriends-header.csv")
    
#in order to to write sql with actual sql code, need to create a view, which is stored somewhere on computer.
# but, can write code like sql without having to do that 
    
print("Here is our inferred schema:")
people.printSchema()

print("Let's display the name column:")
people.select("name").show()

print("Filter out anyone over 21:")
people.filter(people.age < 21).show()

#can put a number inside of the show 
print("Group by age")
people.groupBy("age").count().show(30)

print("Make everyone 10 years older:")
people.select(people.name, people.age + 10).show()

Here is our inferred schema:
root
 |-- userID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- friends: integer (nullable = true)

Let's display the name column:
+--------+
|    name|
+--------+
|    Will|
|Jean-Luc|
|    Hugh|
|  Deanna|
|   Quark|
|  Weyoun|
|  Gowron|
|    Will|
|  Jadzia|
|    Hugh|
|     Odo|
|     Ben|
|   Keiko|
|Jean-Luc|
|    Hugh|
|     Rom|
|  Weyoun|
|     Odo|
|Jean-Luc|
|  Geordi|
+--------+
only showing top 20 rows

Filter out anyone over 21:
+------+-------+---+-------+
|userID|   name|age|friends|
+------+-------+---+-------+
|    21|  Miles| 19|    268|
|    48|    Nog| 20|      1|
|    52|Beverly| 19|    269|
|    54|  Brunt| 19|      5|
|    60| Geordi| 20|    100|
|    73|  Brunt| 20|    384|
|   106|Beverly| 18|    499|
|   115|  Dukat| 18|    397|
|   133|  Quark| 19|    265|
|   136|   Will| 19|    335|
|   225|   Elim| 19|    106|
|   304|   Will| 19|    404|
|   327| Julian| 20|     63|
| 

In [55]:
spark.stop()

## Average number of friends per person 

In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [22]:
people = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("fakefriends-header.csv")

In [48]:
people.groupBy('age').mean('friends')\
    .withColumnRenamed("avg(friends)", "friends")\
    .orderBy('friends', ascending = False).show(5)

+---+-----------------+
|age|          friends|
+---+-----------------+
| 63|            384.0|
| 21|          350.875|
| 18|          343.375|
| 52|340.6363636363636|
| 33|325.3333333333333|
+---+-----------------+
only showing top 5 rows



In [54]:
#use the .agg() to add more flexibility 
#alias goes withing the .agg()
from pyspark.sql import functions as func
people.groupBy('age').agg(func.round(func.avg('friends'),2).alias('test')).orderBy('age').show(5)


+---+------+
|age|  test|
+---+------+
| 18|343.38|
| 19|213.27|
| 20| 165.0|
| 21|350.88|
| 22|206.43|
+---+------+
only showing top 5 rows



In [4]:
spark.stop()