In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [8]:
spark = (SparkSession.builder.appName("challenge_1").getOrCreate())

In [9]:
schema = StructType([
    StructField("Order ID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity Ordered", StringType(), True),
    StructField("Price Each", StringType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Address", StringType(), True)
])

In [10]:
file_path = "./testdata"
sales_df = (spark.read.format("csv")
            .option("header", True)
            .schema(schema)
            .load(file_path))


In [11]:
sales_df.show(10)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|               1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|               1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|               1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|         

In [12]:
sales_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



## Data Preparation and Cleansing

### Remove NULL value and Bad Records

In [14]:
from pyspark.sql.functions import col

In [21]:
sales_df = sales_df.na.drop("any")

In [16]:
sales_df = sales_df.filter(col('Order ID') != 'Order ID')

In [22]:
sales_df.describe("Order ID", "Product", "Quantity Ordered", "Price Each", "Order Date", "Purchase Address").show()

23/02/19 19:26:29 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 1, schema size: 6
CSV file: file:///Users/danhai/Desktop/spark_test/sparkdf/testdata/persons.json




+-------+------------------+------------+------------------+-----------------+--------------+--------------------+
|summary|          Order ID|     Product|  Quantity Ordered|       Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+-----------------+--------------+--------------------+
|  count|            154242|      154242|            154242|           154242|        154242|              154242|
|   mean|239474.59845567355|        null|1.1243111474176943|183.9243648293594|          null|                null|
| stddev| 51566.42897384514|        null|0.4423322524834492|331.8177835932225|          null|                null|
|    min|            141234|20in Monitor|                 1|           109.99|01/01/19 03:07|1 11th St, Los An...|
|    max|            319670|      iPhone|                 9|           999.99|12/31/19 23:53|999 Wilson St, Sa...|
+-------+------------------+------------+------------------+-----------------+--

                                                                                

### Extract City and State from Address into New Columns

In [23]:
from pyspark.sql.functions import split

In [28]:
sales_df.select('Purchase Address', split(col('Purchase Address'), ',').getItem(1).alias('city')).show(10, False)

+-----------------------------------------+--------------+
|Purchase Address                         |city          |
+-----------------------------------------+--------------+
|136 Church St, New York City, NY 10001   | New York City|
|562 2nd St, New York City, NY 10001      | New York City|
|277 Main St, New York City, NY 10001     | New York City|
|410 6th St, San Francisco, CA 94016      | San Francisco|
|43 Hill St, Atlanta, GA 30301            | Atlanta      |
|200 Jefferson St, New York City, NY 10001| New York City|
|928 12th St, Portland, OR 97035          | Portland     |
|813 Hickory St, Dallas, TX 75001         | Dallas       |
|718 Wilson St, Dallas, TX 75001          | Dallas       |
|77 7th St, Dallas, TX 75001              | Dallas       |
+-----------------------------------------+--------------+
only showing top 10 rows



In [31]:
sales_df.select('Purchase Address', split(split(col('Purchase Address'), ',').getItem(2).alias('city'), ' ').getItem(1)).show(10, False)

+-----------------------------------------+----------------------------------------------------------+
|Purchase Address                         |split(split(Purchase Address, ,, -1)[2] AS city,  , -1)[1]|
+-----------------------------------------+----------------------------------------------------------+
|136 Church St, New York City, NY 10001   |NY                                                        |
|562 2nd St, New York City, NY 10001      |NY                                                        |
|277 Main St, New York City, NY 10001     |NY                                                        |
|410 6th St, San Francisco, CA 94016      |CA                                                        |
|43 Hill St, Atlanta, GA 30301            |GA                                                        |
|200 Jefferson St, New York City, NY 10001|NY                                                        |
|928 12th St, Portland, OR 97035          |OR                            

In [34]:
sales_df.show(10, False)

+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|Order ID|Product                   |Quantity Ordered|Price Each|Order Date    |Purchase Address                         |
+--------+--------------------------+----------------+----------+--------------+-----------------------------------------+
|295665  |Macbook Pro Laptop        |1               |1700      |12/30/19 00:01|136 Church St, New York City, NY 10001   |
|295666  |LG Washing Machine        |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001      |
|295667  |USB-C Charging Cable      |1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001     |
|295668  |27in FHD Monitor          |1               |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016      |
|295669  |USB-C Charging Cable      |1               |11.95     |12/18/19 12:38|43 Hill St, Atlanta, GA 30301            |
|295670  |AA Bat

In [36]:
sales_df = (sales_df.withColumn('city', split(col('Purchase Address'), ',').getItem(1))
                       .withColumn('state', split(split(col('Purchase Address'), ',').getItem(2).alias('city'), ' ').getItem(1))
           )

In [38]:
sales_df.show(3, False)

+--------+--------------------+----------------+----------+--------------+--------------------------------------+--------------+-----+
|Order ID|Product             |Quantity Ordered|Price Each|Order Date    |Purchase Address                      |city          |state|
+--------+--------------------+----------------+----------+--------------+--------------------------------------+--------------+-----+
|295665  |Macbook Pro Laptop  |1               |1700      |12/30/19 00:01|136 Church St, New York City, NY 10001| New York City|NY   |
|295666  |LG Washing Machine  |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001   | New York City|NY   |
|295667  |USB-C Charging Cable|1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001  | New York City|NY   |
+--------+--------------------+----------------+----------+--------------+--------------------------------------+--------------+-----+
only showing top 3 rows

23/02/20 03:54:34 WARN Heartbe

In [None]:
def second_element(l)

In [None]:
sales_df.withColumn('city', l)