Data Wrangling part 1

Since there are several ways to access the data in Spark dataframe. In this note book we will show how to access the data in several method in three programming languages which are Python, Scala and Java.　Because Scala and Java are not compatible with Jupyter notebook. So, we will use the result from IntelliJ instead.

このノートブックの目的はSparkの色んな方法でデータフレームをアクセスします。
Sparkデータフレームをラングリングするのは様々な方法で初心者として混乱しやすいです。
そのため、このノートブックはアクセス方法をできるだけまとめます。それに、このノートブックはPython(PySpark)だけではなく、JavaとScalaにも対応します。



Ref:
- [1] https://x1.inkenkun.com/archives/1114#SELECT
- [2] http://mogile.web.fc2.com/spark/spark210/sql-programming-guide.html
- [3] superstore dataset

### Dependencies

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
!git clone https://github.com/mikemooreviz/superstore.git

Cloning into 'superstore'...
remote: Enumerating objects: 9, done.[K
remote: Total 9 (delta 0), reused 0 (delta 0), pack-reused 9[K
Unpacking objects: 100% (9/9), done.


In [0]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession\
         .builder\
         .master("local[*]")\
         .appName("sample_app")\
         .enableHiveSupport()\
         .getOrCreate()

    // scala
        import org.apache.spark.sql.SparkSession
        import org.apache.spark.SparkContext
        import org.apache.spark.SparkConf
        import org.apache.spark.sql.functions.column
        import org.apache.spark.sql.functions.col
        import org.apache.spark.sql.functions.when
        import org.apache.spark.sql.functions.map
        import org.apache.spark.sql.functions.round

        val spark = SparkSession
                    .builder()
                    .master("local[*]")
                    .appName("sample_app")
                    .getOrCreate()
                 import spark.implicits._
    


    // java
        import org.apache.spark.sql.SparkSession;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.api.java.JavaSparkContext;
        import org.apache.spark.SparkConf;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import scala.collection.Map;
        import static org.apache.spark.sql.functions.*;

        SparkSession spark = SparkSession
                            .builder()
                            .master("local[*]")
                            .appName("sample_app")
                            .getOrCreate();

In [0]:
import pyspark.sql.functions as F

def shape(self):
    return (self.count(), len(self.columns))

In [0]:
df_py = spark.read.csv("superstore/superstore.csv", header=True)

# register SQL-command df like table named df_SQL (single sessionj)
# SQL言語としてテーブルで扱うことになる (シングルセッション)
df_py.createOrReplaceTempView("df_py_SQL")

    // scala
        val df_scala = spark
                .read.format("csv")
                .option("header", "true")
                .load("C:/Users/CU - teminal/Desktop/Spark/Superstore.csv")
            
            df_scala.createOrReplaceTempView("df_scala_SQL")


    // java         
        // review: List<String> my_list = new ArrayList<String>();
        // function List type (generally you will see <T> which is abbreviated form of Type) String

        Dataset<Row> df_java = spark.read()
                .format("csv")
                .option("header", "true")
                .load("C:/Users/CU - teminal/Desktop/Spark/Superstore.csv");

        df_java.createOrReplaceTempView("df_java_SQL");

In [0]:
df_py.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



    // scala
        df_scala.printSchema

    // java
        df_java.printSchema();

In [0]:
print(shape(df_py))

(9994, 21)


    // scala
        val r = df_scala.count
        val c = df_scala.columns.length
        print( s"$r, $c")


    // java
        System.out.println(df_java.columns().length + "," + df_java.count());

### ```.select```......```.where```

In [0]:
# Pyspark approach
from pyspark.sql import functions as F
df_py.select(F.col("Customer Name")).show(5)

# Pyspark-SQL style approach
query = """ SELECT `Customer Name`
            FROM   df_py_SQL
        """
spark.sql(query).show(5) 

# Pythonic approach
df_py.select(df_py["Customer Name"], 
             df_py[6]).show(5)

+---------------+
|  Customer Name|
+---------------+
|    Claire Gute|
|    Claire Gute|
|Darrin Van Huff|
| Sean O'Donnell|
| Sean O'Donnell|
+---------------+
only showing top 5 rows

+---------------+
|  Customer Name|
+---------------+
|    Claire Gute|
|    Claire Gute|
|Darrin Van Huff|
| Sean O'Donnell|
| Sean O'Donnell|
+---------------+
only showing top 5 rows

+---------------+---------------+
|  Customer Name|  Customer Name|
+---------------+---------------+
|    Claire Gute|    Claire Gute|
|    Claire Gute|    Claire Gute|
|Darrin Van Huff|Darrin Van Huff|
| Sean O'Donnell| Sean O'Donnell|
| Sean O'Donnell| Sean O'Donnell|
+---------------+---------------+
only showing top 5 rows



    // scala
        df_scala.select($"Customer name", 
                col("Customer name"),
                column("Customer name") as 'CUSTOMER,
                'Profit // allow for non-space column
                ).show(3)

        df_scala.createOrReplaceTempView("df_scala_SQL")
        val query = """
                    SELECT `Customer name`
                    FROM   df_scala_SQL
                    """
        spark.sql(query).show(3)
        ///////////////////////////////////////
        +---------------+---------------+---------------+-------+
        |  Customer name|  Customer name|       CUSTOMER| Profit|
        +---------------+---------------+---------------+-------+
        |    Claire Gute|    Claire Gute|    Claire Gute|41.9136|
        |    Claire Gute|    Claire Gute|    Claire Gute|219.582|
        |Darrin Van Huff|Darrin Van Huff|Darrin Van Huff| 6.8714|
        +---------------+---------------+---------------+-------+

        +---------------+
        |  Customer name|
        +---------------+
        |    Claire Gute|
        |    Claire Gute|
        |Darrin Van Huff|
        +---------------+




    // java
        df_java.select("*").limit(5).show();
        df_java.select(col("Customer name").as("CUSTOMER"),
                       column("Customer name")).show(5);

        df_java.createOrReplaceTempView("df_java_SQL");
        String query = "SELECT `Customer name` as CUSTOMER FROM df_java_SQL";
        Dataset<Row> selected_row = spark.sql(query);
        selected_row.show(5)
        ///////////////////////////////////////
        +---------------+---------------+
        |       CUSTOMER|  Customer name|
        +---------------+---------------+
        |    Claire Gute|    Claire Gute|
        |    Claire Gute|    Claire Gute|
        |Darrin Van Huff|Darrin Van Huff|
        | Sean O'Donnell| Sean O'Donnell|
        | Sean O'Donnell| Sean O'Donnell|
        +---------------+---------------+

        +---------------+
        |       CUSTOMER|
        +---------------+
        |    Claire Gute|
        |    Claire Gute|
        |Darrin Van Huff|
        | Sean O'Donnell|
        | Sean O'Donnell|
        +---------------+


#### Issue 1: space in column and value in df 
* Solution 1: cover column with ( ` ) when SELECT any column.
* Solution 2: replace "space" with underscore (_)

notice: `WHERE` column `LIKE` `STRING (included "space" is allowed)`

In [0]:
query = """
        SELECT    `Customer Name`
        FROM      df_py_SQL
        WHERE     `Customer Name` LIKE "Claire Gute" 
        """
# use `___` for calling  column with space 
# use "___" for calling STRING type

spark.sql(query).show(5)

+-------------+
|Customer Name|
+-------------+
|  Claire Gute|
|  Claire Gute|
|  Claire Gute|
|  Claire Gute|
|  Claire Gute|
+-------------+



In [0]:
# another solution (not recommend) 
# replace "space" with underscore

def under_score_col(df):
    for col in df.columns:
        df = df.withColumnRenamed(col , col.replace(" ", "_"))
    return df

def under_score_value(df):
    for col in df.columns:
        df = df.withColumn(col , F.regexp_replace(F.col(col), " ", "_"))
    return df


unders_df_py = under_score_value(under_score_col(df_py))
unders_df_py.createOrReplaceTempView("unders_df_py_SQL")
query = """
        SELECT    Customer_Name
        FROM      unders_df_py_SQL
        WHERE     Customer_Name LIKE "Claire_Gute" 
        """

spark.sql(query).show(5)

+-------------+
|Customer_Name|
+-------------+
|  Claire_Gute|
|  Claire_Gute|
|  Claire_Gute|
|  Claire_Gute|
|  Claire_Gute|
+-------------+



### ```F.when```, ```df.where``` or ```df.filter``` ???
The goal of these three methods are to extract some data from the original but there are slightly different in the result for ```.filter```

目的には三つのライブラリーは同じみたいな感じですけど、結果の視点ではちょっと違いがあります。それは```.filter```がもう一つのコラムを作るので、特徴量エンジニアリングみたいなことです。



In [0]:
# df method (libraries that belong to df directly)
# dfのメソッド
df_py.where(df_py["Profit"] > 0).select("Category", "Profit").show(5)
df_py.select("Category", "Profit").where(df_py["Profit"] > 0).show(5)
df_py.select("Category", "Profit").filter(df_py.Profit > 0).show(5)
query = """ SELECT Category, 
                   Profit
            FROM   df_py_SQL
            WHERE  Profit > 0 
        """
spark.sql(query).show(5)

# F.when needs to exist in select command. (not belong to df directly)
# F.whenにはselectの中に入ることが必要です (そのせいでF.whenがselectのメソッドみたいな感じ)
df_py.select("Category", "Profit", F.when(df_py.Profit > 0, 1).alias("my_condition")).show(5)

+---------------+-------+
|       Category| Profit|
+---------------+-------+
|      Furniture|41.9136|
|      Furniture|219.582|
|Office Supplies| 6.8714|
|Office Supplies| 2.5164|
|      Furniture|14.1694|
+---------------+-------+
only showing top 5 rows

+---------------+-------+
|       Category| Profit|
+---------------+-------+
|      Furniture|41.9136|
|      Furniture|219.582|
|Office Supplies| 6.8714|
|Office Supplies| 2.5164|
|      Furniture|14.1694|
+---------------+-------+
only showing top 5 rows

+---------------+-------+
|       Category| Profit|
+---------------+-------+
|      Furniture|41.9136|
|      Furniture|219.582|
|Office Supplies| 6.8714|
|Office Supplies| 2.5164|
|      Furniture|14.1694|
+---------------+-------+
only showing top 5 rows

+---------------+-------+
|       Category| Profit|
+---------------+-------+
|      Furniture|41.9136|
|      Furniture|219.582|
|Office Supplies| 6.8714|
|Office Supplies| 2.5164|
|      Furniture|14.1694|
+--------------

    // scala
        /* We don't need to follow SQL pattern (SFWGHO). scala use OOP inheritance like pySpark */
    
        df_scala.select("Category", "Profit").where($"Profit" > 0).show(3)
        df_scala.where($"Profit" > 0).select("Category", "Profit").show(3)
        df_scala.select($"Category", $"Profit").filter($"Profit" > 0).show(3)
        val query = """
                    SELECT  Category, 
                            Profit
                    FROM    df_scala_SQL
                    WHERE   Profit > 0
                    """
        spark.sql(query).show(3)

        import org.apache.spark.sql.functions.when
        df_scala.select($"Category", $"Profit", when($"Profit" > 0, 1).alias("my_condition")).show(5)
        ///////////////////////////////////////

        +---------------+-------+
        |       Category| Profit|
        +---------------+-------+
        |      Furniture|41.9136|
        |      Furniture|219.582|
        |Office Supplies| 6.8714|
        +---------------+-------+
        only showing top 3 rows

        +---------------+-------+
        |       Category| Profit|
        +---------------+-------+
        |      Furniture|41.9136|
        |      Furniture|219.582|
        |Office Supplies| 6.8714|
        +---------------+-------+
        only showing top 3 rows

        +---------------+-------+
        |       Category| Profit|
        +---------------+-------+
        |      Furniture|41.9136|
        |      Furniture|219.582|
        |Office Supplies| 6.8714|
        +---------------+-------+
        only showing top 3 rows

        +---------------+-------+
        |       Category| Profit|
        +---------------+-------+
        |      Furniture|41.9136|
        |      Furniture|219.582|
        |Office Supplies| 6.8714|
        +---------------+-------+
        only showing top 3 rows

        +---------------+--------+------------+
        |       Category|  Profit|my_condition|
        +---------------+--------+------------+
        |      Furniture| 41.9136|           1|
        |      Furniture| 219.582|           1|
        |Office Supplies|  6.8714|           1|
        |      Furniture|-383.031|        null|
        |Office Supplies|  2.5164|           1|
        +---------------+--------+------------+
        only showing top 5 rows

        
        

    // java

        Dataset<Row> selected_row2 = df_java.select("Category", "Profit").where(col("Profit"));
        selected_row2.show(3);

         //https://spark.apache.org/docs/1.5.2/api/java/org/apache/spark/sql/Column.html

        // beware that there are a space between in SQL command.
        String query2 = "SELECT `Category`, `Profit` " +
                        "FROM df_java_SQL " +
                        "WHERE Profit = 0";
        Dataset<Row> eq_0 = spark.sql(query2);

        Dataset<Row> gt_0 = df_java
                        .select("Category", "Profit")
                        .filter(col("Profit").gt(0));

        Dataset<Row> gt_0w = df_java
                        .select("Category", "Profit")
                        .where(col("Profit").gt(0));

        Dataset<Row> gt_00 = df_java
                        .select("Category", "Profit")
                        .where(col("Profit").$greater(0));

        // greater than or equal to
        Dataset<Row> geq_0 = df_java
                        .select("Category", "Profit")
                        .where(col("Profit").geq(0));

        Dataset<Row> gt_eq_0 = df_java
                        .select("Category", "Profit")
                        .where(col("Profit").$greater$eq(0));

        Dataset<Row> gt_0when= df_java.select(col("Category"),
                        col("Profit"),
                        when(col("Profit").gt(0), 1));


        System.out.println( "eq_0: " + eq_0.count()+","+eq_0.columns().length + "\n" +
                            "gt_0 (filter): " + gt_0.count()+","+gt_0.columns().length + "\n" +
                            "gt_0 (where): " + gt_0w.count()+","+gt_0w.columns().length + "\n" +
                            "gt_0 (when): " + gt_0when.count()+","+gt_0when.columns().length + "\n" +
                            "$greater_0: " + gt_00.count()+","+gt_00.columns().length + "\n" +
                            "geq_0: " + geq_0.count()+","+geq_0.columns().length + "\n" +
                            "$greater$eq: " + gt_eq_0.count()+","+gt_eq_0.columns().length);

        /////////////////////
        eq_0: 638,2
        gt_0 (filter): 7583,2
        gt_0 (where): 7583,2
        gt_0 (when): 9994,3
        $greater_0: 7583,2
        geq_0: 8221,2
        $greater$eq: 8221,2

### Aggregation functions

In [0]:
# https://stackoverflow.com/questions/47046827/trouble-with-pyspark-round-function
a_part = df_py.groupBy(["Category"])\
              .agg({"Profit":"sum"}) # K-V method

a_part.show()

a_part.withColumn("sum(Profit)", F.round(a_part["sum(Profit)"], 2)).show()

+---------------+------------------+
|       Category|       sum(Profit)|
+---------------+------------------+
|Office Supplies|120632.87839999991|
|      Furniture| 19686.42720000003|
|     Technology|145388.29659999989|
+---------------+------------------+

+---------------+-----------+
|       Category|sum(Profit)|
+---------------+-----------+
|Office Supplies|  120632.88|
|      Furniture|   19686.43|
|     Technology|   145388.3|
+---------------+-----------+



    // scala
        // K-V method
        import org.apache.spark.sql.functions.map
        df_scala.groupBy($"Category")
                .agg(Map("Profit" -> "sum"))
                .show()

        // general method
        df_scala.groupBy($"Category")
                .agg(sum($"Profit"))
                .show()

        //////////////////////////////////
        +---------------+------------------+
        |       Category|       sum(Profit)|
        +---------------+------------------+
        |Office Supplies|120632.87839999991|
        |      Furniture| 19686.42720000003|
        |     Technology|145388.29659999989|
        +---------------+------------------+

        +---------------+------------------+
        |       Category|       sum(Profit)|
        +---------------+------------------+
        |Office Supplies|120632.87839999991|
        |      Furniture| 19686.42720000003|
        |     Technology|145388.29659999989|
        +---------------+------------------+

    // java
        Dataset<Row> agg1 = df_java.groupBy(col("Category"))
                                   .agg(sum(col("Profit")));
        agg1.show(3);

        ////////////////////////////////////////
        +---------------+------------------+
        |       Category|       sum(Profit)|
        +---------------+------------------+
        |Office Supplies|120632.87839999991|
        |      Furniture| 19686.42720000003|
        |     Technology|145388.29659999989|
        +---------------+------------------+
        

In [0]:
# Multiple aggregations

df_py.groupBy("Category")\
     .agg(F.min("Profit"),
          F.max("Profit"),
          F.sum("Profit"))\
     .show()

     
query = """ 
        SELECT    Category, 
                  MIN(Profit), 
                  MAX(Profit), 
                  SUM(Profit)
        FROM      df_py_SQL
        GROUP BY  Category
        """
spark.sql(query).show()

+---------------+-----------+-----------+------------------+
|       Category|min(Profit)|max(Profit)|       sum(Profit)|
+---------------+-----------+-----------+------------------+
|Office Supplies|    -0.2098|    99.9408|120632.87839999991|
|      Furniture|    -0.3398|     99.432| 19686.42720000003|
|     Technology|    -0.0895|    98.2722|145388.29659999989|
+---------------+-----------+-----------+------------------+

+---------------+-----------+-----------+---------------------------+
|       Category|min(Profit)|max(Profit)|sum(CAST(Profit AS DOUBLE))|
+---------------+-----------+-----------+---------------------------+
|Office Supplies|    -0.2098|    99.9408|         120632.87839999991|
|      Furniture|    -0.3398|     99.432|          19686.42720000003|
|     Technology|    -0.0895|    98.2722|         145388.29659999989|
+---------------+-----------+-----------+---------------------------+



### Rounding dataframe via 
```from pyspark.sql import functions as F```

`F.round`


In [0]:
# Single aggregation

# K-V method
df_py.groupBy(["Category"])\
     .agg({"Profit":"sum"})\
     .show()

# genral method
from pyspark.sql import functions as F
df_py.groupBy(["Category"])\
     .agg(F.sum("Profit"))\
     .show()

# For Spark 2.4, this agg of code is not working now but it is work for Spark 1.XX
# http://sinhrks.hatenablog.com/entry/2015/04/29/085353
df_py.groupby(["Category"]).sum().show() 
# however, for .count() is still work. what is going on?

+---------------+------------------+
|       Category|       sum(Profit)|
+---------------+------------------+
|Office Supplies|120632.87839999991|
|      Furniture| 19686.42720000003|
|     Technology|145388.29659999989|
+---------------+------------------+

+---------------+------------------+
|       Category|       sum(Profit)|
+---------------+------------------+
|Office Supplies|120632.87839999991|
|      Furniture| 19686.42720000003|
|     Technology|145388.29659999989|
+---------------+------------------+

+---------------+
|       Category|
+---------------+
|Office Supplies|
|      Furniture|
|     Technology|
+---------------+



    // scala
        val a_part = df_scala.groupBy($"Category")
                            .agg(Map("Profit" -> "sum"))
                        

        a_part.withColumn("sum(Profit)", 
                        round($"sum(Profit)", 2)
                        ).show(3)

        /////////////////////////
        +---------------+-----------+
        |       Category|sum(Profit)|
        +---------------+-----------+
        |Office Supplies|  120632.88|
        |      Furniture|   19686.43|
        |     Technology|   145388.3|
        +---------------+-----------+

    // java
        Dataset<Row> agg1 = df_java.groupBy(col("Category"))
                                   .agg(sum(col("Profit")));

        Dataset<Row> r_agg1 = agg1.withColumn("sum(Profit)",
                                    round(col("sum(Profit)"), 2));
        r_agg1.show(3);
        
        ///////////////////////////////////
        +---------------+-----------+
        |       Category|sum(Profit)|
        +---------------+-----------+
        |Office Supplies|  120632.88|
        |      Furniture|   19686.43|
        |     Technology|   145388.3|
        +---------------+-----------+
        

### Window functions

In [0]:
query = """
        SELECT     `Customer name`,
                   `Postal Code`, 
                   `Region`,
                   `Quantity`,
                    SUM(`Quantity`) OVER (PARTITION BY `Customer name`) AS `SUM quantity of each customer`,
                    Max(`Quantity`) OVER (PARTITION BY `Region`) AS `MAX quantity of each region`
         FROM       df_py_SQL
         WHERE     `Postal Code` > 80000
         ORDER BY  `Customer name`
         """


print("how many items had been bought by customer who are living in the west region??? ") # Assume that postal_code of west reg is greater than 80k 
spark.sql(query).show(10)

how many items had been bought by customer who are living in the west region??? 
+--------------+-----------+------+--------+-----------------------------+---------------------------+
| Customer name|Postal Code|Region|Quantity|SUM quantity of each customer|MAX quantity of each region|
+--------------+-----------+------+--------+-----------------------------+---------------------------+
| Aaron Bergman|      98103|  West|       3|                          7.0|                     98.352|
| Aaron Bergman|      98103|  West|       3|                          7.0|                     98.352|
| Aaron Bergman|      98103|  West|       1|                          7.0|                     98.352|
| Aaron Hawkins|      94109|  West|       4|                         23.0|                     98.352|
| Aaron Hawkins|      90004|  West|       2|                         23.0|                     98.352|
| Aaron Hawkins|      94122|  West|       2|                         23.0|                     

    // scala
        val query = """
                SELECT  `Customer name`,
                        `Postal Code`, 
                        `Region`,
                        `Quantity`,
                        SUM(`Quantity`) OVER (PARTITION BY `Customer name`) AS `SUM quantity of each customer`,
                        Max(`Quantity`) OVER (PARTITION BY `Region`) AS `MAX quantity of each region`
                FROM       df_scala_SQL
                WHERE     `Postal Code` > 80000
                ORDER BY  `Customer name`
                """

        how many items had been bought by customer who are living in the west region??? 
        +--------------+-----------+------+--------+-----------------------------+---------------------------+
        | Customer name|Postal Code|Region|Quantity|SUM quantity of each customer|MAX quantity of each region|
        +--------------+-----------+------+--------+-----------------------------+---------------------------+
        | Aaron Bergman|      98103|  West|       3|                          7.0|                     98.352|
        | Aaron Bergman|      98103|  West|       3|                          7.0|                     98.352|
        | Aaron Bergman|      98103|  West|       1|                          7.0|                     98.352|
        | Aaron Hawkins|      90004|  West|       2|                         23.0|                     98.352|
        | Aaron Hawkins|      94109|  West|       4|                         23.0|                     98.352|
        | Aaron Hawkins|      90004|  West|       6|                         23.0|                     98.352|
        | Aaron Hawkins|      94122|  West|       2|                         23.0|                     98.352|
        | Aaron Hawkins|      94122|  West|       9|                         23.0|                     98.352|
        |Aaron Smayling|      94110|  West|       2|                         12.0|                     98.352|
        |Aaron Smayling|      91104|  West|       3|                         12.0|                     98.352|
        +--------------+-----------+------+--------+-----------------------------+---------------------------+

println("how many items had been bought by customer who are living in the west region??? ") 
// Assume that postal_code of west reg is greater than 80k 
spark.sql(query).show(10)


    // java
        String query3 =
                " SELECT   `Customer name`, " +
                        " `Postal Code`," +
                        " `Region`," +
                        " `Quantity`," +
                        " SUM(`Quantity`) OVER (PARTITION BY `Customer name`) AS `SUM quantity of each customer`, " +
                        " Max(`Quantity`) OVER (PARTITION BY `Region`) AS `MAX quantity of each region` " +
                        " FROM     df_java_SQL " +
                        " WHERE    `Postal Code` > 80000 " +
                        " ORDER BY `Customer name` ";

        // Assume that postal_code of west reg is greater than 80k
        System.out.println("how many items had been bought by customer who are living in the west region??? ");
        spark.sql(query3).show(10);
        /////////////////////////////////////

        how many items had been bought by customer who are living in the west region??? 
        +--------------+-----------+------+--------+-----------------------------+---------------------------+
        | Customer name|Postal Code|Region|Quantity|SUM quantity of each customer|MAX quantity of each region|
        +--------------+-----------+------+--------+-----------------------------+---------------------------+
        | Aaron Bergman|      98103|  West|       3|                          7.0|                     98.352|
        | Aaron Bergman|      98103|  West|       3|                          7.0|                     98.352|
        | Aaron Bergman|      98103|  West|       1|                          7.0|                     98.352|
        | Aaron Hawkins|      94109|  West|       4|                         23.0|                     98.352|
        | Aaron Hawkins|      90004|  West|       2|                         23.0|                     98.352|
        | Aaron Hawkins|      94122|  West|       2|                         23.0|                     98.352|
        | Aaron Hawkins|      94122|  West|       9|                         23.0|                     98.352|
        | Aaron Hawkins|      90004|  West|       6|                         23.0|                     98.352|
        |Aaron Smayling|      94110|  West|       2|                         12.0|                     98.352|
        |Aaron Smayling|      97756|  West|       7|                         12.0|                     98.352|
        +--------------+-----------+------+--------+-----------------------------+---------------------------+

        


 Aggregate all columns of a dataframe at once
   
    https://stackoverflow.com/questions/56256785/pyspark-aggregate-all-columns-of-a-dataframe-at-once?noredirect=1&lq=1
    https://stackoverflow.com/questions/33882894/spark-sql-apply-aggregate-functions-to-a-list-of-columns


### Aggregaton function vs Window function
![alt text](https://static.packt-cdn.com/products/9781783989188/graphics/B01781_06_02.jpg)

cr. https://static.packt-cdn.com/products/9781783989188/graphics/B01781_06_02.jpg

In [0]:
print("how much contribution of each customer (across customer) in each region when return items(-) are not concerned")
query = """SELECT  `Customer name`,
                   Profit,
                   Region,
                   Quantity,
                   Discount, 
                   MAX(Profit) OVER (PARTITION BY `Customer name`) AS MAX_profit_each_customer, \
                   SUM(Profit) OVER (PARTITION BY `Customer name`) AS SUM_profit_each_customer, \
                   SUM(Profit) OVER (PARTITION BY Region) AS SUM_profit_each_region \
         FROM      df_py_SQL 
         WHERE     Profit > 0 
         ORDER BY  `Customer name`"""


spark.sql(query).show(20)

how much contribution of each customer (across customer) in each region when return items(-) are not concerned
+--------------+--------+-------+--------+--------+------------------------+------------------------+----------------------+
| Customer name|  Profit| Region|Quantity|Discount|MAX_profit_each_customer|SUM_profit_each_customer|SUM_profit_each_region|
+--------------+--------+-------+--------+--------+------------------------+------------------------+----------------------+
| Aaron Bergman|  5.4801|   West|       1|     0.2|                 62.1544|      131.87130000000002|    129822.30940000013|
| Aaron Bergman|  4.6644|   West|       3|       0|                 62.1544|      131.87130000000002|    129822.30940000013|
| Aaron Bergman| 54.7136|Central|       2|       0|                 62.1544|      131.87130000000002|     95473.11760000007|
| Aaron Bergman| 62.1544|Central|       2|       0|                 62.1544|      131.87130000000002|     95473.11760000007|
| Aaron Bergma

### Time series wrangling

notice: column type is very importance in time series wrangling.

In [0]:
df_py.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [0]:
# Timestamp format
df_py.select(F.to_timestamp(df_py["Ship Date"], 'yyyy-MM-dd').alias("Ship Date")).show(3)

# Date format
df_py.select(F.to_date(df_py["Ship Date"])).show(3)

+-------------------+
|          Ship Date|
+-------------------+
|2017-11-11 00:00:00|
|2017-11-11 00:00:00|
|2017-06-16 00:00:00|
+-------------------+
only showing top 3 rows

+--------------------+
|to_date(`Ship Date`)|
+--------------------+
|          2017-11-11|
|          2017-11-11|
|          2017-06-16|
+--------------------+
only showing top 3 rows



Add modified column in the dataframe

In [0]:
# new_df_py = df_py.drop("Ship Date", "Order Date")
new_df_py = df_py.withColumn("Ship Date (date)", F.to_date(df_py["Ship Date"]))\
                 .withColumn("Order Date (date)", F.to_date(df_py["Order Date"]))\
                 .drop("Order Date", "Ship Date") # drop the original
                 
new_df_py  = new_df_py.withColumnRenamed("Order Date (date)", "Order Date")\
                      .withColumnRenamed("Ship Date (date)", "Ship Date")

# sort columns
    # new_df_py = new_df_py.select(sorted(new_df_py.columns))     

new_df_py.printSchema()            

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Order Date: date (nullable = true)

