In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.sql.types import *

In [4]:

schema = StructType(
    [
    StructField('id', IntegerType(), nullable=False),
    StructField('order_date',DateType(),nullable=False),
    StructField('amount', IntegerType(), nullable=False),
    StructField('status', StringType(), nullable=False)
    ]
)

df = spark.read.csv(path='/user/bigdatapedia/data/orders.csv', schema=schema)

In [5]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- amount: integer (nullable = true)
 |-- status: string (nullable = true)



In [6]:
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+---+----------+------+---------------+
| id|order_date|amount|         status|
+---+----------+------+---------------+
|  1|2013-07-25| 11599|         CLOSED|
|  2|2013-07-25|   256|PENDING_PAYMENT|
|  3|2013-07-25| 12111|       COMPLETE|
|  4|2013-07-25|  8827|         CLOSED|
|  5|2013-07-25| 11318|       COMPLETE|
|  6|2013-07-25|  7130|       COMPLETE|
|  7|2013-07-25|  4530|       COMPLETE|
|  8|2013-07-25|  2911|     PROCESSING|
|  9|2013-07-25|  5657|PENDING_PAYMENT|
| 10|2013-07-25|  5648|PENDING_PAYMENT|
| 11|2013-07-25|   918| PAYMENT_REVIEW|
| 12|2013-07-25|  1837|         CLOSED|
| 13|2013-07-25|  9149|PENDING_PAYMENT|
| 14|2013-07-25|  9842|     PROCESSING|
| 15|2013-07-25|  2568|       COMPLETE|
| 16|2013-07-25|  7276|PENDING_PAYMENT|
| 17|2013-07-25|  2667|       COMPLETE|
| 18|2013-07-25|  1205|         CLOSED|
| 19|2013-07-25|  9488|PENDING_PAYMENT|
| 20|2013-07-25|  9198|     PROCESSING|
+---+----------+------+---------------+
only showing top 20 rows



                                                                                

**Descriptive Statistics**

In [7]:
df.describe().show()

[Stage 1:>                                                          (0 + 1) / 1]

+-------+------------------+-----------------+--------+
|summary|                id|           amount|  status|
+-------+------------------+-----------------+--------+
|  count|             68883|            68883|   68883|
|   mean|           34442.0|6216.571098819738|    null|
| stddev|19884.953633337947|3586.205241263963|    null|
|    min|                 1|                1|CANCELED|
|    max|             68883|            12435|   test4|
+-------+------------------+-----------------+--------+



                                                                                

**Show specific columns**

In [8]:
df.select('order_date','status').show()

+----------+---------------+
|order_date|         status|
+----------+---------------+
|2013-07-25|         CLOSED|
|2013-07-25|PENDING_PAYMENT|
|2013-07-25|       COMPLETE|
|2013-07-25|         CLOSED|
|2013-07-25|       COMPLETE|
|2013-07-25|       COMPLETE|
|2013-07-25|       COMPLETE|
|2013-07-25|     PROCESSING|
|2013-07-25|PENDING_PAYMENT|
|2013-07-25|PENDING_PAYMENT|
|2013-07-25| PAYMENT_REVIEW|
|2013-07-25|         CLOSED|
|2013-07-25|PENDING_PAYMENT|
|2013-07-25|     PROCESSING|
|2013-07-25|       COMPLETE|
|2013-07-25|PENDING_PAYMENT|
|2013-07-25|       COMPLETE|
|2013-07-25|         CLOSED|
|2013-07-25|PENDING_PAYMENT|
|2013-07-25|     PROCESSING|
+----------+---------------+
only showing top 20 rows



**Filter based on value**

In [9]:
df.select('order_date').distinct().show() # Show unique values of `order_date`

                                                                                

+----------+
|order_date|
+----------+
|2013-09-09|
|2013-09-19|
|2014-06-03|
|2013-09-12|
|2014-01-24|
|2014-02-16|
|2014-06-11|
|2013-11-18|
|2014-02-18|
|2013-08-14|
|2013-10-05|
|2014-07-04|
|2014-07-06|
|2013-09-18|
|2013-09-20|
|2013-09-25|
|2014-06-13|
|2013-11-23|
|2013-09-14|
|2014-02-24|
+----------+
only showing top 20 rows



In [10]:
df.select('order_date').distinct().count() # counting unique values of order_date

                                                                                

364

In [11]:
df.filter(df.order_date == '2014-02-24').show() # filtering orders on 24th Feb,2014

+-----+----------+------+---------------+
|   id|order_date|amount|         status|
+-----+----------+------+---------------+
|34670|2014-02-24|  8535|       COMPLETE|
|34671|2014-02-24|  9335|       COMPLETE|
|34672|2014-02-24|  1219|        PENDING|
|34673|2014-02-24| 10197|       COMPLETE|
|34674|2014-02-24|  9390|PENDING_PAYMENT|
|34675|2014-02-24| 10657|PENDING_PAYMENT|
|34676|2014-02-24|  8376|       COMPLETE|
|34677|2014-02-24| 12360|       CANCELED|
|34678|2014-02-24|  1492|         CLOSED|
|34679|2014-02-24| 11938|        ON_HOLD|
|34680|2014-02-24|  3738|       COMPLETE|
|34681|2014-02-24|  8242|        PENDING|
|34682|2014-02-24| 11685|     PROCESSING|
|34683|2014-02-24|  2438|       COMPLETE|
|34684|2014-02-24|   674|PENDING_PAYMENT|
|34685|2014-02-24|  3213|       COMPLETE|
|34686|2014-02-24|  6000|         CLOSED|
|34687|2014-02-24|  1352|         CLOSED|
|34688|2014-02-24|  8033|SUSPECTED_FRAUD|
|34689|2014-02-24| 10410|        ON_HOLD|
+-----+----------+------+---------

                                                                                

In [12]:
# filtering orders on 24th Feb,2014 and amount lesser than $1500

from pyspark.sql.functions import col

In [13]:
df.filter((col('order_date') == '2014-02-24') & (col('amount') <= 1500)).show()

+-----+----------+------+---------------+
|   id|order_date|amount|         status|
+-----+----------+------+---------------+
|34672|2014-02-24|  1219|        PENDING|
|34678|2014-02-24|  1492|         CLOSED|
|34684|2014-02-24|   674|PENDING_PAYMENT|
|34687|2014-02-24|  1352|         CLOSED|
|34707|2014-02-24|    64|SUSPECTED_FRAUD|
|34716|2014-02-24|   137|     PROCESSING|
|34738|2014-02-24|  1454|PENDING_PAYMENT|
|34742|2014-02-24|  1263|        PENDING|
|34743|2014-02-24|  1241|        PENDING|
|34747|2014-02-24|  1107|       COMPLETE|
|34768|2014-02-24|  1207|       COMPLETE|
|34780|2014-02-24|  1263|PENDING_PAYMENT|
|34813|2014-02-24|  1032|     PROCESSING|
|34815|2014-02-24|  1137|PENDING_PAYMENT|
|34826|2014-02-24|   515|       COMPLETE|
|63408|2014-02-24|   631|        PENDING|
|63415|2014-02-24|   312|       COMPLETE|
|63419|2014-02-24|   221|        PENDING|
|63420|2014-02-24|   291|       COMPLETE|
+-----+----------+------+---------------+



                                                                                

**Min, Max, Average**

In [14]:
df.agg({'amount':'min',
        }).show()

+-----------+
|min(amount)|
+-----------+
|          1|
+-----------+



In [15]:
df.agg({'amount':'max',
        }).show()

+-----------+
|max(amount)|
+-----------+
|      12435|
+-----------+



In [16]:
df.agg({'amount':'avg',
        }).show()

+-----------------+
|      avg(amount)|
+-----------------+
|6216.571098819738|
+-----------------+



In [17]:
df.groupby('status').agg({'amount':'min',
        }).show()

+---------------+-----------+
|         status|min(amount)|
+---------------+-----------+
|PENDING_PAYMENT|          2|
|          test4|       8814|
|          test3|       4960|
|          test1|       5946|
|       COMPLETE|          1|
|        ON_HOLD|          2|
| PAYMENT_REVIEW|         12|
|          test2|       6704|
|     PROCESSING|          3|
|         CLOSED|          4|
|SUSPECTED_FRAUD|         16|
|        PENDING|          3|
|       CANCELED|         18|
+---------------+-----------+



In [18]:
df.groupby('status').agg({'amount':'max',
        }).show()

+---------------+-----------+
|         status|max(amount)|
+---------------+-----------+
|PENDING_PAYMENT|      12434|
|          test4|       8814|
|          test3|       4960|
|          test1|       5946|
|       COMPLETE|      12434|
|        ON_HOLD|      12434|
| PAYMENT_REVIEW|      12433|
|          test2|       6704|
|     PROCESSING|      12431|
|         CLOSED|      12434|
|SUSPECTED_FRAUD|      12429|
|        PENDING|      12435|
|       CANCELED|      12435|
+---------------+-----------+



**Creating Views on dataframe to run SQL Queries**

In [19]:
df.createOrReplaceTempView('tempView') # Creating Temp View

In [20]:
result = spark.sql('select * from tempView where order_date = "2014-02-24" and amount <= 1500')

In [21]:
result.show()

[Stage 41:>                                                         (0 + 1) / 1]

+-----+----------+------+---------------+
|   id|order_date|amount|         status|
+-----+----------+------+---------------+
|34672|2014-02-24|  1219|        PENDING|
|34678|2014-02-24|  1492|         CLOSED|
|34684|2014-02-24|   674|PENDING_PAYMENT|
|34687|2014-02-24|  1352|         CLOSED|
|34707|2014-02-24|    64|SUSPECTED_FRAUD|
|34716|2014-02-24|   137|     PROCESSING|
|34738|2014-02-24|  1454|PENDING_PAYMENT|
|34742|2014-02-24|  1263|        PENDING|
|34743|2014-02-24|  1241|        PENDING|
|34747|2014-02-24|  1107|       COMPLETE|
|34768|2014-02-24|  1207|       COMPLETE|
|34780|2014-02-24|  1263|PENDING_PAYMENT|
|34813|2014-02-24|  1032|     PROCESSING|
|34815|2014-02-24|  1137|PENDING_PAYMENT|
|34826|2014-02-24|   515|       COMPLETE|
|63408|2014-02-24|   631|        PENDING|
|63415|2014-02-24|   312|       COMPLETE|
|63419|2014-02-24|   221|        PENDING|
|63420|2014-02-24|   291|       COMPLETE|
+-----+----------+------+---------------+



                                                                                

**Creating new column and fill values based on other columns**

In [22]:
df.select('status').distinct().show()

+---------------+
|         status|
+---------------+
|PENDING_PAYMENT|
|          test4|
|          test3|
|          test1|
|       COMPLETE|
|        ON_HOLD|
| PAYMENT_REVIEW|
|          test2|
|     PROCESSING|
|         CLOSED|
|SUSPECTED_FRAUD|
|        PENDING|
|       CANCELED|
+---------------+



**Method 1:**

In [23]:
from pyspark.sql.functions import when, col

In [25]:
df2 = df.withColumn('modified_status_1',when(df.status=='test4','TESTING')
                    .when(df.status=='test3', 'TESTING')
                    .when(df.status=='test2', 'TESTING')
                    .when(df.status=='test1', 'TESTING')
                    .otherwise(df.status))

In [26]:
df2.show()

+---+----------+------+---------------+-----------------+
| id|order_date|amount|         status|modified_status_1|
+---+----------+------+---------------+-----------------+
|  1|2013-07-25| 11599|         CLOSED|           CLOSED|
|  2|2013-07-25|   256|PENDING_PAYMENT|  PENDING_PAYMENT|
|  3|2013-07-25| 12111|       COMPLETE|         COMPLETE|
|  4|2013-07-25|  8827|         CLOSED|           CLOSED|
|  5|2013-07-25| 11318|       COMPLETE|         COMPLETE|
|  6|2013-07-25|  7130|       COMPLETE|         COMPLETE|
|  7|2013-07-25|  4530|       COMPLETE|         COMPLETE|
|  8|2013-07-25|  2911|     PROCESSING|       PROCESSING|
|  9|2013-07-25|  5657|PENDING_PAYMENT|  PENDING_PAYMENT|
| 10|2013-07-25|  5648|PENDING_PAYMENT|  PENDING_PAYMENT|
| 11|2013-07-25|   918| PAYMENT_REVIEW|   PAYMENT_REVIEW|
| 12|2013-07-25|  1837|         CLOSED|           CLOSED|
| 13|2013-07-25|  9149|PENDING_PAYMENT|  PENDING_PAYMENT|
| 14|2013-07-25|  9842|     PROCESSING|       PROCESSING|
| 15|2013-07-2

In [29]:
df2.filter((col('status')=='test1')|(col('status')=='test2')|(col('status')=='test3')|(col('status')=='test4')).show()

+-----+----------+------+------+-----------------+
|   id|order_date|amount|status|modified_status_1|
+-----+----------+------+------+-----------------+
|68809|2014-03-12|  5946| test1|          TESTING|
|68817|2014-03-27|  6704| test2|          TESTING|
|68827|2014-04-16|  8814| test4|          TESTING|
|68871|2014-06-28|  4960| test3|          TESTING|
+-----+----------+------+------+-----------------+



                                                                                

**Method 2:**

In [30]:
from pyspark.sql.functions import expr

In [31]:
df3 = df.withColumn('modified_status', expr("CASE WHEN status = 'test1' THEN 'TESTING' " +
                                            "WHEN status = 'test2' THEN 'TESTING' WHEN status = 'test3' THEN 'TESTING' " +
                                            "WHEN status = 'test4' THEN 'TESTING' WHEN status IS NULL THEN ''" +
                                            "ELSE status END"))

In [32]:
df3.show()

+---+----------+------+---------------+---------------+
| id|order_date|amount|         status|modified_status|
+---+----------+------+---------------+---------------+
|  1|2013-07-25| 11599|         CLOSED|         CLOSED|
|  2|2013-07-25|   256|PENDING_PAYMENT|PENDING_PAYMENT|
|  3|2013-07-25| 12111|       COMPLETE|       COMPLETE|
|  4|2013-07-25|  8827|         CLOSED|         CLOSED|
|  5|2013-07-25| 11318|       COMPLETE|       COMPLETE|
|  6|2013-07-25|  7130|       COMPLETE|       COMPLETE|
|  7|2013-07-25|  4530|       COMPLETE|       COMPLETE|
|  8|2013-07-25|  2911|     PROCESSING|     PROCESSING|
|  9|2013-07-25|  5657|PENDING_PAYMENT|PENDING_PAYMENT|
| 10|2013-07-25|  5648|PENDING_PAYMENT|PENDING_PAYMENT|
| 11|2013-07-25|   918| PAYMENT_REVIEW| PAYMENT_REVIEW|
| 12|2013-07-25|  1837|         CLOSED|         CLOSED|
| 13|2013-07-25|  9149|PENDING_PAYMENT|PENDING_PAYMENT|
| 14|2013-07-25|  9842|     PROCESSING|     PROCESSING|
| 15|2013-07-25|  2568|       COMPLETE|       CO

In [33]:
df3.filter((col('status')=='test1')|(col('status')=='test2')|(col('status')=='test3')|(col('status')=='test4')).show()

+-----+----------+------+------+---------------+
|   id|order_date|amount|status|modified_status|
+-----+----------+------+------+---------------+
|68809|2014-03-12|  5946| test1|        TESTING|
|68817|2014-03-27|  6704| test2|        TESTING|
|68827|2014-04-16|  8814| test4|        TESTING|
|68871|2014-06-28|  4960| test3|        TESTING|
+-----+----------+------+------+---------------+



                                                                                

**Column Renaming**

In [34]:
df2.withColumnRenamed('modified_status_1','updated_status').show()

+---+----------+------+---------------+---------------+
| id|order_date|amount|         status| updated_status|
+---+----------+------+---------------+---------------+
|  1|2013-07-25| 11599|         CLOSED|         CLOSED|
|  2|2013-07-25|   256|PENDING_PAYMENT|PENDING_PAYMENT|
|  3|2013-07-25| 12111|       COMPLETE|       COMPLETE|
|  4|2013-07-25|  8827|         CLOSED|         CLOSED|
|  5|2013-07-25| 11318|       COMPLETE|       COMPLETE|
|  6|2013-07-25|  7130|       COMPLETE|       COMPLETE|
|  7|2013-07-25|  4530|       COMPLETE|       COMPLETE|
|  8|2013-07-25|  2911|     PROCESSING|     PROCESSING|
|  9|2013-07-25|  5657|PENDING_PAYMENT|PENDING_PAYMENT|
| 10|2013-07-25|  5648|PENDING_PAYMENT|PENDING_PAYMENT|
| 11|2013-07-25|   918| PAYMENT_REVIEW| PAYMENT_REVIEW|
| 12|2013-07-25|  1837|         CLOSED|         CLOSED|
| 13|2013-07-25|  9149|PENDING_PAYMENT|PENDING_PAYMENT|
| 14|2013-07-25|  9842|     PROCESSING|     PROCESSING|
| 15|2013-07-25|  2568|       COMPLETE|       CO

**Upper, Lower**

In [35]:
from pyspark.sql.functions import upper, lower

In [40]:
df.select(lower(df.status)).show()

+---------------+
|  lower(status)|
+---------------+
|         closed|
|pending_payment|
|       complete|
|         closed|
|       complete|
|       complete|
|       complete|
|     processing|
|pending_payment|
|pending_payment|
| payment_review|
|         closed|
|pending_payment|
|     processing|
|       complete|
|pending_payment|
|       complete|
|         closed|
|pending_payment|
|     processing|
+---------------+
only showing top 20 rows



**Missing Values**

In [41]:
from pyspark.sql.functions import col,isnan, when, count

In [43]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+----------+------+------+
| id|order_date|amount|status|
+---+----------+------+------+
|  0|         0|     0|     0|
+---+----------+------+------+



                                                                                

**User Defined Functions**

In [44]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [45]:
# User-defined function
def upper_to_lower(status):
    return status.lower()

In [47]:
# Register the UDF
lowercase_udf = udf(upper_to_lower, StringType())

In [56]:
# Apply the UDF to the DataFrame column
df.withColumn('name_lowercase',lowercase_udf(col('status'))).show(truncate=False)

+---+----------+------+---------------+---------------+
|id |order_date|amount|status         |name_lowercase |
+---+----------+------+---------------+---------------+
|1  |2013-07-25|11599 |CLOSED         |closed         |
|2  |2013-07-25|256   |PENDING_PAYMENT|pending_payment|
|3  |2013-07-25|12111 |COMPLETE       |complete       |
|4  |2013-07-25|8827  |CLOSED         |closed         |
|5  |2013-07-25|11318 |COMPLETE       |complete       |
|6  |2013-07-25|7130  |COMPLETE       |complete       |
|7  |2013-07-25|4530  |COMPLETE       |complete       |
|8  |2013-07-25|2911  |PROCESSING     |processing     |
|9  |2013-07-25|5657  |PENDING_PAYMENT|pending_payment|
|10 |2013-07-25|5648  |PENDING_PAYMENT|pending_payment|
|11 |2013-07-25|918   |PAYMENT_REVIEW |payment_review |
|12 |2013-07-25|1837  |CLOSED         |closed         |
|13 |2013-07-25|9149  |PENDING_PAYMENT|pending_payment|
|14 |2013-07-25|9842  |PROCESSING     |processing     |
|15 |2013-07-25|2568  |COMPLETE       |complete 

Traceback (most recent call last):
  File "/home/bigdatapedia/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/home/bigdatapedia/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/bigdatapedia/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/bigdatapedia/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


**Joins and Combining DataFrames**

In [59]:
employees = spark.createDataFrame([(1, "John", "Engineering"), (2, "Mike", "HR"), (3, "Sara", "Finance")], ["emp_id", "name", "department"])
addresses = spark.createDataFrame([(1, "NY"), (2, "LA"), (4, "DC")], ["emp_id", "address"])

In [60]:
# Inner Join
result_inner = employees.join(addresses, on='emp_id',how='inner')
result_inner.show()

                                                                                

+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
+------+----+-----------+-------+



In [61]:
# Outer Join
result_outer = employees.join(other=addresses, on='emp_id', how='outer')
result_outer.show()

                                                                                

+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     3|Sara|    Finance|   null|
|     2|Mike|         HR|     LA|
|     4|null|       null|     DC|
+------+----+-----------+-------+



In [62]:
# left join
result_left = employees.join(addresses, "emp_id", "left_outer")
result_left.show()

                                                                                

+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     3|Sara|    Finance|   null|
|     2|Mike|         HR|     LA|
+------+----+-----------+-------+



**Joining on Multiple Columns**

In [63]:
df1 = spark.createDataFrame([(1, "John", "Doe"), (2, "Mike", "Smith")], ["id", "first_name", "last_name"])
df2 = spark.createDataFrame([(1, "John", "Doe"), (2, "Mike", "Johnson")], ["id", "first_name", "last_name"])

In [64]:
result = df1.join(df2, ['id','first_name'])
result.show()

+---+----------+---------+---------+
| id|first_name|last_name|last_name|
+---+----------+---------+---------+
|  1|      John|      Doe|      Doe|
|  2|      Mike|    Smith|  Johnson|
+---+----------+---------+---------+



**Broadcast Joins**

In [65]:
from pyspark.sql.functions import broadcast

In [66]:
result = employees.join(broadcast(addresses), "emp_id")
result.show()

+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
+------+----+-----------+-------+

