## Transformation

- It is an operation that creates a new RDD/Dataframe from an existing one
- It is lazy and does not compute immediately until we call an action like .show() or .count()

### Wide vs Narrow Transforamtion
|                    | Wide                  | Narrow     |
|--------------------|-----------------------|------------|
|Data Shuffling      |Yes - Across Parititons| No         |
|Speed               |Slower                 | Faster     |
|Partition Dependency|Many to Many           | 1 to 1     |
|Introvert/Extrovert |Extrovert              | Introvert  |
|Example             | joins, groupBy        | map, filter|

## Wide Transformation

- The Driver partitions the data and assigns jobs to multiple executors
- Wide Transformations are those where the executors need to communicate to each other to provide the necessary output
- This communication between the executors is called as "SHUFFLE"
- Shuffle is the most expensive operation in spark.

In [14]:
# Lets try to use group by to analyse wide transformation

# Creating a spark sessions
from pyspark.sql import SparkSession

sp = SparkSession.builder\
    .appName("Wide Transformation")\
    .getOrCreate()

print(sp.version)

# Reading Data from a json file into a datafram
dataset_review = sp.read.json("C:\\Users\\imdig\\Desktop\\Kaggle Datasets\\Yelp Data\\yelp_academic_dataset_review.json")

print(dataset_review.show(5))

3.5.5
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute

In [None]:
# Remember whenever we are doing any kind of transformation it will create a new dataframe/RDD

# Questions - Pull count of users by each star rating
dataset_review_stars_grouping = dataset_review.select("stars","user_id")\
                                                .groupBy("stars")\
                                                .count()\
                                                .orderBy("count",ascending = False)

dataset_review_stars_grouping.show()

+-----+-------+
|stars|  count|
+-----+-------+
|  5.0|3231627|
|  4.0|1452918|
|  1.0|1069561|
|  3.0| 691934|
|  2.0| 544240|
+-----+-------+



In [15]:
# Lets do a join as well and then try to do some aggregations

dataset_user = sp.read.json("C:\\Users\\imdig\\Desktop\\Kaggle Datasets\\Yelp Data\\yelp_academic_dataset_user.json")

dataset_user.show(5)

+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+------+------------+------+--------------------+-------------------+
|average_stars|compliment_cool|compliment_cute|compliment_funny|compliment_hot|compliment_list|compliment_more|compliment_note|compliment_photos|compliment_plain|compliment_profile|compliment_writer| cool|               elite|fans|             friends|funny|  name|review_count|useful|             user_id|      yelping_since|
+-------------+---------------+---------------+----------------+--------------+---------------+---------------+---------------+-----------------+----------------+------------------+-----------------+-----+--------------------+----+--------------------+-----+------+------------+------+--------------------+-------------------+
|         3.91|    

In [36]:
# Lets try to get the username who gave more reviews


most_review_by_user = dataset_review.join(dataset_user, on = "user_id", how = "inner")\
                                  .select("user_id","name","stars")\
                                  .groupBy("user_id","name")\
                                  .count()\
                                  .orderBy("count",ascending = False)\
                                  .limit(1)

most_review_by_user.show()

+--------------------+-----+-----+
|             user_id| name|count|
+--------------------+-----+-----+
|_BcWyKQL16ndpBdgg...|Karen| 3048|
+--------------------+-----+-----+



In [41]:
# We got our most valuable customer above
# also lets try to look how the reviews were distributed across starts for the valuable customer

val_cust_start_distribution = most_review_by_user.join(dataset_review, on = "user_id", how = "inner")\
                                               .select("name","stars","count")\
                                               .groupBy("name","count","stars")\
                                               .count()\
                                               .orderBy("count",ascending= False)

val_cust_start_distribution.show()

+-----+-----+-----+-----+
| name|count|stars|count|
+-----+-----+-----+-----+
|Karen| 3048|  4.0| 1443|
|Karen| 3048|  2.0|  265|
|Karen| 3048|  1.0|   68|
|Karen| 3048|  5.0|  451|
|Karen| 3048|  3.0|  821|
+-----+-----+-----+-----+



In [42]:
sp.stop()