spark context are the entry point of any spark application. Here sparkcontext, hivecontext are used to interact with spark,hive functionality. SparkConf has a parameters for cluster.

In [188]:
# Helper functions
def head(df,no=5):
    display(df.limit(no))
    
def shape(df):
    return (df.count() ,len(df.columns))

In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf().setMaster('local') 
#sc   = SparkContext(conf=conf) # to use 1 core
sc   = SparkContext()          # to all system

rdd1 = sc.parallelize(range(100))
print(rdd1.count())

100


In [3]:
sc

### Start a SparkSession where we can try out different dataframe operations that can be applied to both batch and streaming processors.

SparkSession is an encapsulate version of all context. In backend it will create context and interact with other context also.

In [10]:
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession.builder.appName("Spark Basic Operations").getOrCreate()

In [13]:
sc

In [16]:
spark

### Creating a DataFrame from scratch

In [19]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [21]:
schema = StructType([
    StructField(name="city",dataType=StringType(),nullable=True),
    StructField(name="country",dataType=StringType(),nullable=True),
    StructField(name="number",dataType=LongType(),nullable=False),
])

In [22]:
rows = [
    Row("Jaipur","India",1),
    Row("Ajmer","India",2)]

In [23]:
parallelizeRows = spark.sparkContext.parallelize(rows)

In [25]:
df = spark.createDataFrame(parallelizeRows, schema)

In [31]:
df.show()

+------+-------+------+
|  city|country|number|
+------+-------+------+
|Jaipur|  India|     1|
| Ajmer|  India|     2|
+------+-------+------+



## Create a DF from different file

In [54]:
df = spark.read.csv("weather.csv",header=True, inferSchema=True)

In [55]:
df.show()

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|
|2011-01-01 00:30:00|               8.075|      9.285|   0.975|     8.92|         0|           0|          0|
|2011-01-01 00:45:00|              7.8925|     9.0875|  0.9875|    8.905|         0|           0|          0|
|2011-01-01 01:00:00|                7.71|       8.89|     1.0|     8.89|         0|           0|          0|
|2011-01-01 01:15:00|                7.71|       8.89|     1.0|     8.89|         0|           0|          0|
|2011-01-0

#### Create a lazily evaluated "view" that we can use in Spark SQL

In [56]:
df.createOrReplaceTempView("df_table")

In [58]:
# To check schema of df
df.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- apparent_temperature: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- dew_point: double (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_bearing: integer (nullable = true)
 |-- cloud_cover: integer (nullable = true)



In [59]:
type(df)

pyspark.sql.dataframe.DataFrame

In [62]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        | df_table|       true|
+--------+---------+-----------+



### Manipulating Columns
Columns in spark are similar to columns in pandas. we can select, transform and remove columns with the use of expressions.

We cannot manipuate a column outside of the context of a datframe, therefore we need to use spark tranformation within a dataframe to modify a column

In [63]:
import pyspark.sql.functions as F

Now that we have the dataframe we can use select and selectExpr for columns/expression and expressions in string respectivity.

In [68]:
df.select("temperature").show(2)

+-----------+
|temperature|
+-----------+
|       9.68|
|     9.4825|
+-----------+
only showing top 2 rows



In [69]:
df.select(F.col("temperature")).show(2)

+-----------+
|temperature|
+-----------+
|       9.68|
|     9.4825|
+-----------+
only showing top 2 rows



In [70]:
df.select(df.temperature).show(2)

+-----------+
|temperature|
+-----------+
|       9.68|
|     9.4825|
+-----------+
only showing top 2 rows



In [71]:
df.select("temperature","humidity").show(2)

+-----------+--------+
|temperature|humidity|
+-----------+--------+
|       9.68|    0.95|
|     9.4825|  0.9625|
+-----------+--------+
only showing top 2 rows



In [72]:
df.select(["temperature","humidity"]).show(2)

+-----------+--------+
|temperature|humidity|
+-----------+--------+
|       9.68|    0.95|
|     9.4825|  0.9625|
+-----------+--------+
only showing top 2 rows



In [83]:
# To change to column in expression
df.select(F.expr("temperature as t")).show(2)

+------+
|     t|
+------+
|  9.68|
|9.4825|
+------+
only showing top 2 rows



In [84]:
df.select(F.expr("temperature as t"),
         F.expr("humidity as h")).show(2)

+------+------+
|     t|     h|
+------+------+
|  9.68|  0.95|
|9.4825|0.9625|
+------+------+
only showing top 2 rows



In [92]:
# Change column name in expression and change in back to its original 
df.select('temperature').alias("t").show(2) # This will not work

+-----------+
|temperature|
+-----------+
|       9.68|
|     9.4825|
+-----------+
only showing top 2 rows



In [99]:
df.select(F.expr('temperature as t')).alias("temperature").show(2) # This will not work

+------+
|     t|
+------+
|  9.68|
|9.4825|
+------+
only showing top 2 rows



For the above we can use select_expr to build more complex expression in order to create new dataframes

In [102]:
# I crate new df2 from df with 2 features as name below
df2 = df.selectExpr("temperature as temp", "temperature")

In [103]:
df2.show(2)

+------+-----------+
|  temp|temperature|
+------+-----------+
|  9.68|       9.68|
|9.4825|     9.4825|
+------+-----------+
only showing top 2 rows



In [105]:
df.selectExpr("avg(temperature)","count(distinct(humidity))").show()

+-----------------+------------------------+
| avg(temperature)|count(DISTINCT humidity)|
+-----------------+------------------------+
|24.56041067351691|                    1556|
+-----------------+------------------------+



Passing explicit values with literals

In [107]:
df.select(F.expr("*"), F.lit(1).alias("One")).show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+---+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|One|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+---+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|  1|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|  1|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+---+
only showing top 2 rows



Adding a column

In [110]:
df.withColumn("One",F.lit(1)).show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+---+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|One|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+---+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|  1|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|  1|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+---+
only showing top 2 rows



Renaming a column/ another way

In [118]:
df = df.withColumnRenamed("temperature","new_temp")

In [119]:
df.columns

['datetime',
 'apparent_temperature',
 'new_temp',
 'humidity',
 'dew_point',
 'wind_speed',
 'wind_bearing',
 'cloud_cover']

In [120]:
df = df.withColumnRenamed("new_temp","temperature")
df.columns

['datetime',
 'apparent_temperature',
 'temperature',
 'humidity',
 'dew_point',
 'wind_speed',
 'wind_bearing',
 'cloud_cover']

Removing columns

In [125]:
df.drop("cloud_cover").columns

['datetime',
 'apparent_temperature',
 'temperature',
 'humidity',
 'dew_point',
 'wind_speed',
 'wind_bearing']

dataframe filtering

In [184]:
df.filter(F.col("temperature") < 12).select("datetime",
                                            "temperature").show(3)

+-------------------+-----------+
|           datetime|temperature|
+-------------------+-----------+
|2011-01-01 00:00:00|       9.68|
|2011-01-01 00:15:00|     9.4825|
|2011-01-01 00:30:00|      9.285|
+-------------------+-----------+
only showing top 3 rows



In [186]:
df.where((F.col("temperature") < 12) & 
         (F.col("temperature") > 9.5)).select("datetime",
                                          "temperature").show(3)

+-------------------+-----------+
|           datetime|temperature|
+-------------------+-----------+
|2011-01-01 00:00:00|       9.68|
|2011-01-01 03:45:00|      9.725|
|2011-01-01 04:00:00|       10.0|
+-------------------+-----------+
only showing top 3 rows



In [146]:
df.where(F.col("temperature") < 12).\
where(F.col("temperature") > 9.5).\
select("datetime","temperature").show(3)

+-------------------+-----------+
|           datetime|temperature|
+-------------------+-----------+
|2011-01-01 00:00:00|       9.68|
|2011-01-01 03:45:00|      9.725|
|2011-01-01 04:00:00|       10.0|
+-------------------+-----------+
only showing top 3 rows



Get Distinct rows

In [150]:
df.select("temperature").distinct().count()

7422

Get Random samples

In [167]:
df.sample(withReplacement=False, fraction=1.0, seed=48).show(3)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|
|2011-01-01 00:30:00|               8.075|      9.285|   0.975|     8.92|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 3 rows



Random splits

In [170]:
a, b = df.randomSplit([0.6,0.4], seed=34)

In [189]:
df.show(1)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 1 row



In [190]:
shape(a)

(10502, 8)

RDDs are immutable which means you cannot append dfs

#### Concatenating and appending rows

In [195]:
new_df = df.union(a)

In [197]:
shape(df), shape(a), shape(new_df)

((17520, 8), (10502, 8), (28022, 8))

In [205]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        | df_table|       true|
|        | new_view|       true|
+--------+---------+-----------+



In [204]:
new_df.createOrReplaceTempView("new_view")

In [207]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        | df_table|       true|
|        | new_view|       true|
+--------+---------+-----------+



Sorting

In [209]:
df.sort("datetime").show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 2 rows



In [210]:
df.sort(F.desc("datetime")).show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-07-02 11:45:00|             39.8675|       34.7|   0.495|  22.6175|         0|           0|          0|
|2011-07-02 11:30:00|              39.405|      34.43|     0.5|   22.515|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 2 rows



In [211]:
df.orderBy("datetime").show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 2 rows



In [214]:
df.orderBy(['datetime','temperature']).show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 2 rows



In [217]:
df.sort(F.desc('datetime'),F.asc('temperature')).show(2)

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-07-02 11:45:00|             39.8675|       34.7|   0.495|  22.6175|         0|           0|          0|
|2011-07-02 11:30:00|              39.405|      34.43|     0.5|   22.515|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
only showing top 2 rows



Limiting what we extract from a dataframe

In [218]:
df.limit(3).show()

+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|           datetime|apparent_temperature|temperature|humidity|dew_point|wind_speed|wind_bearing|cloud_cover|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+
|2011-01-01 00:00:00|                8.44|       9.68|    0.95|     8.95|         0|           0|          0|
|2011-01-01 00:15:00|              8.2575|     9.4825|  0.9625|    8.935|         0|           0|          0|
|2011-01-01 00:30:00|               8.075|      9.285|   0.975|     8.92|         0|           0|          0|
+-------------------+--------------------+-----------+--------+---------+----------+------------+-----------+

