In [21]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

In [22]:
spark = SparkSession.builder.master("local[*]").appName("thesparkapp").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print(spark.sparkContext.appName)
print("You are working with", cores, "core(s)")


spark

thesparkapp
You are working with 1 core(s)


In [23]:
airbnb = spark.read.csv('nyc_air_bnb.csv',inferSchema=True,header=True)

In [24]:
airbnb.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)



In [18]:
#spark.stop()

# Spark SQL

In [25]:
airbnb.createOrReplaceTempView("Tempview")

In [26]:
spark.sql("Select price,minimum_nights from Tempview").show()

+-----+--------------+
|price|minimum_nights|
+-----+--------------+
|  149|             1|
|  225|             1|
|  150|             3|
|   89|             1|
|   80|            10|
|  200|             3|
|   60|            45|
|   79|             2|
|   79|             2|
|  150|             1|
|  135|             5|
|   85|             2|
|   89|             4|
|   85|             2|
|  120|            90|
|  140|             2|
|  215|             2|
|  140|             1|
|   99|             3|
|  190|             7|
+-----+--------------+
only showing top 20 rows



In [27]:
airbnb.count(), spark.sql("Select count(*) from Tempview").show()

+--------+
|count(1)|
+--------+
|   49079|
+--------+



(49079, None)

In [28]:
df = spark.sql("Select price,minimum_nights from Tempview")

In [30]:
df.columns

['price', 'minimum_nights']

In [46]:
json_str=['{"price":"0", "minimum_nights":"1000"}',
         '{"price":"1000", "minimum_nights":"0"}']

In [47]:
newlineRdd = spark.sparkContext.parallelize(json_str)

In [48]:
morelines = spark.read.json(newlineRdd)

In [39]:
from pyspark.sql.functions import *

In [42]:
morelines = morelines.select(col("price"),col("minimum_nights"))

In [49]:
alldata = df.union(morelines)

In [53]:
alldata.filter("price < 1").show()

+---------+---------------+
|    price| minimum_nights|
+---------+---------------+
|        0|              4|
|        0|              2|
|        0|              2|
|        0|              2|
|        0|              5|
|        0|              1|
|        0|              1|
|        0|              1|
|        0|              3|
|        0|             30|
|        0|             30|
|-73.99986|   Private room|
|-74.00828|Entire home/apt|
|        0|           1000|
+---------+---------------+



In [55]:
alldata.filter("price > 999 and minimum_nights < 1").show()

+-----+--------------+
|price|minimum_nights|
+-----+--------------+
| 1000|             0|
+-----+--------------+



## Reading files with spark sql

In [63]:
#df2 = spark.sql("select * from format.`filepath.format`")
#df2 = spark.sql("select * from parquet.`filepath.parquet`")
df2 = spark.sql("select * from csv.`nyc_air_bnb.csv`")

In [64]:
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)



# Load & Save

In [57]:
df1 = spark.read.load('nyc_air_bnb.csv',format="csv")
df1 = spark.read.option("header",True).csv('nyc_air_bnb.csv')

In [60]:
df1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)



## Save

In [None]:
df1.write.save(path="path",format="parquet")

## SaveMode

In [68]:
#modes : append, error, ignore, overwrite
airbnb.write.mode("error").json("theairbnb")

In [69]:
df3 = spark.read.json("theairbnb")

In [70]:
df3.printSchema()

root
 |-- availability_365: long (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- name: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- price: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- room_type: string (nullable = true)



## Persistent Table

In [66]:
airbnb.write.saveAsTable("airbnbTable")

In [67]:
spark.sql("select * from airbnbTable").show(2)

+----+--------------------+-------+---------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+---------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|     John|           Brooklyn|   Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
|2595|Skylit Midtown Ca...|   2845| Jennifer|          Manhattan|      M

## Partitionning

In [None]:
airbnb.write.partitionBy("columnname").saveAsTable("airbnbPartitions")
spark.sql("show partitions airbnbPartitions").show()

# Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

In [14]:
airbnb.createGlobalTempView("globairbnb")

In [16]:
spark.newSession().sql("SELECT * FROM global_temp.globairbnb").show(1)

+----+--------------------+-------+---------+-------------------+-------------+--------+---------+------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|   room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+---------+-------------------+-------------+--------+---------+------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|     John|           Brooklyn|   Kensington|40.64749|-73.97237|Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
+----+--------------------+-------+---------+-------------------+-------------+-----

# SCALA API

In [None]:
case class Nameclass(age:Integer, gender:String)
val data = spark.read.csv("filepath").as[Nameclass]

In [71]:
spark.stop()

# Broadcast functionality 

In [72]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

In [73]:
spark = SparkSession.builder.master("local[*]").appName("thesparkapp").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print(spark.sparkContext.appName)
print("You are working with", cores, "core(s)")


spark

thesparkapp
You are working with 1 core(s)


In [74]:
iris= spark.read.csv('iris.csv',inferSchema=True,header=True)

In [77]:
iris.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1| 

In [78]:
from pyspark.sql.types import StructType

In [79]:
schema = StructType().add("species","string").add("Id","integer")

In [80]:
speciestable = spark.createDataFrame([("setosa",1),("versicolor",2),("virginica",3)],schema=schema)

In [81]:
speciestable.show()

+----------+---+
|   species| Id|
+----------+---+
|    setosa|  1|
|versicolor|  2|
| virginica|  3|
+----------+---+



In [85]:
irisjoin = iris.join(speciestable,on='species')

In [96]:
from pyspark.sql.functions import col

In [100]:
irisjoin.filter(col("species")=="virginica").show(2)

+---------+------------+-----------+------------+-----------+---+
|  species|sepal_length|sepal_width|petal_length|petal_width| Id|
+---------+------------+-----------+------------+-----------+---+
|virginica|         5.9|        3.0|         5.1|        1.8|  3|
|virginica|         6.2|        3.4|         5.4|        2.3|  3|
+---------+------------+-----------+------------+-----------+---+
only showing top 2 rows



In [125]:
for i in speciestable.select(col("species")).toPandas()["species"].unique():
    irisjoin.filter(col("species")==i).show(2)

+-------+------------+-----------+------------+-----------+---+
|species|sepal_length|sepal_width|petal_length|petal_width| Id|
+-------+------------+-----------+------------+-----------+---+
| setosa|         5.0|        3.3|         1.4|        0.2|  1|
| setosa|         5.3|        3.7|         1.5|        0.2|  1|
+-------+------------+-----------+------------+-----------+---+
only showing top 2 rows

+----------+------------+-----------+------------+-----------+---+
|   species|sepal_length|sepal_width|petal_length|petal_width| Id|
+----------+------------+-----------+------------+-----------+---+
|versicolor|         5.7|        2.8|         4.1|        1.3|  2|
|versicolor|         5.1|        2.5|         3.0|        1.1|  2|
+----------+------------+-----------+------------+-----------+---+
only showing top 2 rows

+---------+------------+-----------+------------+-----------+---+
|  species|sepal_length|sepal_width|petal_length|petal_width| Id|
+---------+------------+--------

In [124]:
speciestable.select(col("species")).toPandas()["species"].unique()

array(['setosa', 'versicolor', 'virginica'], dtype=object)

### to add broadcast functionality

In [127]:
from pyspark.sql.functions import broadcast

In [128]:
speciestable = broadcast(spark.createDataFrame([("setosa",1),("versicolor",2),("virginica",3)],schema=schema))

In [129]:
irisjoin = iris.join(speciestable,on='species')

In [130]:
irisjoin.show()

+-------+------------+-----------+------------+-----------+---+
|species|sepal_length|sepal_width|petal_length|petal_width| Id|
+-------+------------+-----------+------------+-----------+---+
| setosa|         5.1|        3.5|         1.4|        0.2|  1|
| setosa|         4.9|        3.0|         1.4|        0.2|  1|
| setosa|         4.7|        3.2|         1.3|        0.2|  1|
| setosa|         4.6|        3.1|         1.5|        0.2|  1|
| setosa|         5.0|        3.6|         1.4|        0.2|  1|
| setosa|         5.4|        3.9|         1.7|        0.4|  1|
| setosa|         4.6|        3.4|         1.4|        0.3|  1|
| setosa|         5.0|        3.4|         1.5|        0.2|  1|
| setosa|         4.4|        2.9|         1.4|        0.2|  1|
| setosa|         4.9|        3.1|         1.5|        0.1|  1|
| setosa|         5.4|        3.7|         1.5|        0.2|  1|
| setosa|         4.8|        3.4|         1.6|        0.2|  1|
| setosa|         4.8|        3.0|      

In [131]:
irisjoin.explain()

== Physical Plan ==
*(2) Project [species#1280, sepal_length#1276, sepal_width#1277, petal_length#1278, petal_width#1279, Id#1574]
+- *(2) BroadcastHashJoin [species#1280], [species#1573], Inner, BuildRight
   :- *(2) Project [sepal_length#1276, sepal_width#1277, petal_length#1278, petal_width#1279, species#1280]
   :  +- *(2) Filter isnotnull(species#1280)
   :     +- FileScan csv [sepal_length#1276,sepal_width#1277,petal_length#1278,petal_width#1279,species#1280] Batched: false, DataFilters: [isnotnull(species#1280)], Format: CSV, Location: InMemoryFileIndex[file:/Users/livai/Desktop/CAPITAL/SPARK/iris.csv], PartitionFilters: [], PushedFilters: [IsNotNull(species)], ReadSchema: struct<sepal_length:double,sepal_width:double,petal_length:double,petal_width:double,species:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])), [id=#1027]
      +- *(1) Filter isnotnull(species#1573)
         +- *(1) Scan ExistingRDD[species#1573,Id#1574]




In [132]:
irisjoin.explain(extended=True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(species))
:- Relation[sepal_length#1276,sepal_width#1277,petal_length#1278,petal_width#1279,species#1280] csv
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [species#1573, Id#1574], false

== Analyzed Logical Plan ==
species: string, sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, Id: int
Project [species#1280, sepal_length#1276, sepal_width#1277, petal_length#1278, petal_width#1279, Id#1574]
+- Join Inner, (species#1280 = species#1573)
   :- Relation[sepal_length#1276,sepal_width#1277,petal_length#1278,petal_width#1279,species#1280] csv
   +- ResolvedHint (strategy=broadcast)
      +- LogicalRDD [species#1573, Id#1574], false

== Optimized Logical Plan ==
Project [species#1280, sepal_length#1276, sepal_width#1277, petal_length#1278, petal_width#1279, Id#1574]
+- Join Inner, (species#1280 = species#1573), rightHint=(strategy=broadcast)
   :- Filter isnotnull(species#1280)
   :  +- Relati

```{python}
irisjoin = iris.join(speciestable,on='species')
irisjoin.explain()
irisjoin.explain(extended=True)```

```python
x = 'hello, python world!'
print(x.split(' '))
```

In [133]:
spark.stop()