In [16]:
from pyspark.sql import SparkSession

In [17]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

In [18]:
#Read the csv file as a dataframe. And, not as RDD. See the schema of the DF.
df = spark.read.csv('datasets/Wholesale customers data.csv',header=True)
df.show()

+-------+------+-----+-----+-------+------+----------------+----------+
|Channel|Region|Fresh| Milk|Grocery|Frozen|Detergents_Paper|Delicassen|
+-------+------+-----+-----+-------+------+----------------+----------+
|      2|     3|12669| 9656|   7561|   214|            2674|      1338|
|      2|     3| 7057| 9810|   9568|  1762|            3293|      1776|
|      2|     3| 6353| 8808|   7684|  2405|            3516|      7844|
|      1|     3|13265| 1196|   4221|  6404|             507|      1788|
|      2|     3|22615| 5410|   7198|  3915|            1777|      5185|
|      2|     3| 9413| 8259|   5126|   666|            1795|      1451|
|      2|     3|12126| 3199|   6975|   480|            3140|       545|
|      2|     3| 7579| 4956|   9426|  1669|            3321|      2566|
|      1|     3| 5963| 3648|   6192|   425|            1716|       750|
|      2|     3| 6006|11093|  18881|  1159|            7425|      2098|
|      2|     3| 3366| 5403|  12974|  4400|            5977|    

In [19]:
df.printSchema()

root
 |-- Channel: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Fresh: string (nullable = true)
 |-- Milk: string (nullable = true)
 |-- Grocery: string (nullable = true)
 |-- Frozen: string (nullable = true)
 |-- Detergents_Paper: string (nullable = true)
 |-- Delicassen: string (nullable = true)



In [20]:
df.createOrReplaceTempView("customersdata")

In [21]:
sql_op = spark.sql("SELECT * FROM customersdata")

In [22]:
sql_op.show()

+-------+------+-----+-----+-------+------+----------------+----------+
|Channel|Region|Fresh| Milk|Grocery|Frozen|Detergents_Paper|Delicassen|
+-------+------+-----+-----+-------+------+----------------+----------+
|      2|     3|12669| 9656|   7561|   214|            2674|      1338|
|      2|     3| 7057| 9810|   9568|  1762|            3293|      1776|
|      2|     3| 6353| 8808|   7684|  2405|            3516|      7844|
|      1|     3|13265| 1196|   4221|  6404|             507|      1788|
|      2|     3|22615| 5410|   7198|  3915|            1777|      5185|
|      2|     3| 9413| 8259|   5126|   666|            1795|      1451|
|      2|     3|12126| 3199|   6975|   480|            3140|       545|
|      2|     3| 7579| 4956|   9426|  1669|            3321|      2566|
|      1|     3| 5963| 3648|   6192|   425|            1716|       750|
|      2|     3| 6006|11093|  18881|  1159|            7425|      2098|
|      2|     3| 3366| 5403|  12974|  4400|            5977|    

In [23]:
#Use select to view a single column or a st of chosen columns.
df.select('Fresh').show()

+-----+
|Fresh|
+-----+
|12669|
| 7057|
| 6353|
|13265|
|22615|
| 9413|
|12126|
| 7579|
| 5963|
| 6006|
| 3366|
|13146|
|31714|
|21217|
|24653|
|10253|
| 1020|
| 5876|
|18601|
| 7780|
+-----+
only showing top 20 rows



In [24]:
#Use filter to see records with fresh sales more than 50000 only.
df.filter(df.Fresh > 50000).show()

+-------+------+------+-----+-------+------+----------------+----------+
|Channel|Region| Fresh| Milk|Grocery|Frozen|Detergents_Paper|Delicassen|
+-------+------+------+-----+-------+------+----------------+----------+
|      1|     3| 56159|  555|    902| 10002|             212|      2916|
|      1|     3| 56082| 3504|   8906| 18028|            1480|      2498|
|      1|     3| 76237| 3473|   7102| 16538|             778|       918|
|      1|     3|112151|29627|  18148| 16745|            4948|      8550|
|      1|     1| 56083| 4563|   2124|  6422|             730|      3321|
|      1|     1| 53205| 4959|   7336|  3012|             967|       818|
|      1|     3| 68951| 4411|  12609|  8692|             751|      2406|
+-------+------+------+-----+-------+------+----------------+----------+



In [25]:
from pyspark.sql.functions import count

In [26]:
#Create aggregates on channels and regions variables.
df.groupBy("Channel","Region").agg(count("*")).show()

+-------+------+--------+
|Channel|Region|count(1)|
+-------+------+--------+
|      1|     1|      59|
|      2|     3|     105|
|      2|     2|      19|
|      1|     2|      28|
|      1|     3|     211|
|      2|     1|      18|
+-------+------+--------+



In [27]:
#Use describe to see summary statistics on dataframe.
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|summary|           Channel|            Region|             Fresh|              Milk|          Grocery|           Frozen|  Detergents_Paper|        Delicassen|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|  count|               440|               440|               440|               440|              440|              440|               440|               440|
|   mean|1.3227272727272728| 2.543181818181818|12000.297727272728| 5796.265909090909|7951.277272727273|3071.931818181818|2881.4931818181817|1524.8704545454545|
| stddev|0.4680515694791137|0.7742724492301002|12647.328865076885|7380.3771745708445|9503.162828994346|4854.673332592367| 4767.854447904201|2820.1059373693965|
|    min|                 1|            

In [28]:
from pyspark.sql.types import StringType

In [29]:
#Change datatype of Channels to Strings.
data_schema = df.withColumn("Channel", df["Channel"].cast(StringType()))

In [30]:
data_schema.printSchema()

root
 |-- Channel: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Fresh: string (nullable = true)
 |-- Milk: string (nullable = true)
 |-- Grocery: string (nullable = true)
 |-- Frozen: string (nullable = true)
 |-- Detergents_Paper: string (nullable = true)
 |-- Delicassen: string (nullable = true)

