In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *

In [2]:
#creating SparkSession
sc = SparkSession\
    .builder\
    .master("local[1]")\
    .appName('example_spark')\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/16 19:05:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/16 19:05:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
#manually adding data and headers then creating a dataframe
data1 = [
    (1, "Gilbert Gathara", "24", "Nairobi"), 
    (2, "Someone Else", "32", "Nowhere")
]
headers = ("id", "Name", "Age", "Location")
df1 = sc.createDataFrame(data1, headers)
df1.show()

                                                                                

+---+---------------+---+--------+
| id|           Name|Age|Location|
+---+---------------+---+--------+
|  1|Gilbert Gathara| 24| Nairobi|
|  2|   Someone Else| 32| Nowhere|
+---+---------------+---+--------+



RDD Creation

In [4]:
#creates a rdd from parallelize - used to create a RDD from a list
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 100)]
rdd=sc.sparkContext.parallelize(dataList)

In [None]:
##RDD Operations - 

In [5]:
#reading a csv file with header
df2 = sc.read.option("header", "True").csv("AB_NYC_2019.csv")
df2.show()

+----+--------------------+-------+----------------+-------------------+------------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|       host_name|neighbourhood_group|     neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+----------------+-------------------+------------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|            John|           Brooklyn|        Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
|2595|Skylit Midtown Ca.

In [6]:
#renaming column and printing updated schema again
df2.withColumnRenamed("name", "address").printSchema()

root
 |-- id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



PySpark SQL

In [29]:
dfsql = sc.read.option("header", "True").csv("AB_NYC_2019.csv")


#creates a temporary table on dataframe using createOrReplaceTempView()
dfsql.createOrReplaceTempView("airbnb")

#once created, it can be accessed through sql()
sc.sql("SELECT * FROM airbnb") \
    .show(5)
sc.sql("SELECT neighbourhood_group, price FROM airbnb") \
    .show(5)

+----+--------------------+-------+-----------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|  host_name|neighbourhood_group|neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+-----------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|       John|           Brooklyn|   Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
|2595|Skylit Midtown Ca...|   2845|   Jennifer|          Manhatt

In [31]:
#filtering rows using WHERE
sc.sql("SELECT name, host_name, neighbourhood FROM airbnb WHERE neighbourhood = 'Upper West Side' ") \
    .show(5)

#can be used as where() from dataframe
dfsql.select("name","host_name","room_type","neighbourhood") \
  .where("neighbourhood == 'Upper East Side'") \
  .show(5)


+--------------------+---------+---------------+
|                name|host_name|  neighbourhood|
+--------------------+---------+---------------+
|Cozy Clean Guest ...|MaryEllen|Upper West Side|
|Beautiful 1br on ...|     Lena|Upper West Side|
|Wonderful Guest B...|  Claudio|Upper West Side|
|   West Side Retreat|      Cyn|Upper West Side|
|Cozy 1BD on Centr...|     Joya|Upper West Side|
+--------------------+---------+---------------+
only showing top 5 rows

+--------------------+---------+---------------+---------------+
|                name|host_name|      room_type|  neighbourhood|
+--------------------+---------+---------------+---------------+
|2 bedroom - Upper...|        D|Entire home/apt|Upper East Side|
|      Manhattan Room| Victoria|   Private room|Upper East Side|
|Garden studio in ...|     Lisa|Entire home/apt|Upper East Side|
|Light and Airy Up...|     Sara|Entire home/apt|Upper East Side|
|CENTRAL PARK LOFT...|    Marie|Entire home/apt|Upper East Side|
+-------------

In [33]:
dfsql.groupBy("neighbourhood").count() \
  .show(5)

sc.sql(""" SELECT neighbourhood_group, count(*) as count FROM airbnb 
          GROUP BY neighbourhood_group""") \
     .show(5)

+-------------+-----+
|neighbourhood|count|
+-------------+-----+
|       Corona|   64|
| Richmondtown|    1|
| Prince's Bay|    4|
|  Westerleigh|    2|
|   Mill Basin|    4|
+-------------+-----+
only showing top 5 rows

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|         Douglaston|    1|
|             Queens| 5630|
|              Nadia|    1|
|            Midtown|    4|
|    Jackson Heights|    2|
+-------------------+-----+
only showing top 5 rows

