In [1]:
import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("TEST") \
        .getOrCreate()

In [2]:
spark

In [3]:
df = spark.range(500).toDF("number")

In [11]:
df.select(df["number"]+10).show(5)

+-------------+
|(number + 10)|
+-------------+
|           10|
|           11|
|           12|
|           13|
|           14|
+-------------+
only showing top 5 rows



In [12]:
df =    spark. \
        read. \
        format("json"). \
        load("/home/anaconda/data/spark/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")       
    

In [13]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [14]:
df.schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

In [62]:
from pyspark.sql.types import *

myManualScema = StructType(
    [
        StructField("DEST_COUNTRY_NAME",StringType(),True),
        StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
        StructField("count",LongType(),True)
    ]
)

df =    spark. \
        read. \
        format("json"). \
        schema(myManualScema). \
        load("/home/anaconda/data/spark/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")       


In [19]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [37]:
df.columns()

TypeError: 'list' object is not callable

In [22]:
df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [39]:
df.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [36]:
from pyspark.sql.functions import *

df.expr("DEST_COUNTRY_NAME")

AttributeError: 'DataFrame' object has no attribute 'expr'

In [38]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [40]:
from pyspark.sql import *

In [45]:
myRow = Row("Hello", None, 1)

In [42]:
myRow

<Row('Hello', None, 1, False)>

In [43]:
df.createOrReplaceTempView("dfTable")

In [46]:
myDf = spark.createDataFrame([myRow], myManualScema)
myDf.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Hello|               null|    1|
+-----------------+-------------------+-----+



In [48]:
df.selectExpr("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME AS A").show()

+--------------------+-----+
|   DEST_COUNTRY_NAME|    A|
+--------------------+-----+
|       United States|false|
|       United States|false|
|       United States|false|
|               Egypt|false|
|       United States|false|
|       United States|false|
|       United States|false|
|          Costa Rica|false|
|             Senegal|false|
|             Moldova|false|
|       United States|false|
|       United States|false|
|              Guyana|false|
|               Malta|false|
|            Anguilla|false|
|             Bolivia|false|
|       United States|false|
|             Algeria|false|
|Turks and Caicos ...|false|
|       United States|false|
+--------------------+-----+
only showing top 20 rows



In [50]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [51]:
df.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [54]:
df = df.withColumn("numberOne", lit(1))

In [55]:
df.show()

+--------------------+-------------------+-----+---------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+--------------------+-------------------+-----+---------+
|       United States|            Romania|   15|        1|
|       United States|            Croatia|    1|        1|
|       United States|            Ireland|  344|        1|
|               Egypt|      United States|   15|        1|
|       United States|              India|   62|        1|
|       United States|          Singapore|    1|        1|
|       United States|            Grenada|   62|        1|
|          Costa Rica|      United States|  588|        1|
|             Senegal|      United States|   40|        1|
|             Moldova|      United States|    1|        1|
|       United States|       Sint Maarten|  325|        1|
|       United States|   Marshall Islands|   39|        1|
|              Guyana|      United States|   64|        1|
|               Malta|      United States|    1|        

In [56]:
df.drop("numberOne").columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [57]:
df

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, numberOne: int]

In [58]:
df.show()

+--------------------+-------------------+-----+---------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+--------------------+-------------------+-----+---------+
|       United States|            Romania|   15|        1|
|       United States|            Croatia|    1|        1|
|       United States|            Ireland|  344|        1|
|               Egypt|      United States|   15|        1|
|       United States|              India|   62|        1|
|       United States|          Singapore|    1|        1|
|       United States|            Grenada|   62|        1|
|          Costa Rica|      United States|  588|        1|
|             Senegal|      United States|   40|        1|
|             Moldova|      United States|    1|        1|
|       United States|       Sint Maarten|  325|        1|
|       United States|   Marshall Islands|   39|        1|
|              Guyana|      United States|   64|        1|
|               Malta|      United States|    1|        

In [65]:
df = df.drop("numberOne")

In [66]:
df

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [67]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [72]:
df.where(col("ORIGIN_COUNTRY_NAME") == "Croatia").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



In [75]:
df[df["ORIGIN_COUNTRY_NAME"] == "Croatia"].show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



In [78]:
cond1 = df["ORIGIN_COUNTRY_NAME"] == "Croatia"
df[cond1]

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [85]:
df.selectExpr("ORIGIN_COUNTRY_NAME").count()

256

In [90]:

df.selectExpr("count").count()

256

In [91]:
df['count'].count_values()

TypeError: 'Column' object is not callable

In [92]:
df = spark. \
     read. \
     format('csv'). \
     option('header','true'). \
     load("/home/anaconda/data/spark/Spark-The-Definitive-Guide-master/data/retail-data/by-day/2010-12-01.csv")


In [93]:
df

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]

In [94]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [95]:
df.schema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

In [114]:
mySchema = StructType(
    [
        StructField("InvoiceNo",StringType(),True),
        StructField("StockCode",StringType(),True),
        StructField("Description",StringType(),True),
        StructField("Quantity",IntegerType(),True),
        StructField("InvoiceDate",TimestampType(),True),
        StructField("UnitPrice",DoubleType(),True),
        StructField("CustomerID",StringType(),True),
        StructField("Country",StringType(),True)
    ]
)

In [115]:
df = spark. \
     read. \
     schema(mySchema). \
     format('csv'). \
     option('header','true'). \
     load("/home/anaconda/data/spark/Spark-The-Definitive-Guide-master/data/retail-data/by-day/2010-12-01.csv")


In [116]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [117]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [118]:
df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [124]:
df.where(col("StockCode")==71053) \
  .selectExpr('InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country') \
  .show(2,False)


+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|536365   |71053    |WHITE METAL LANTERN|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536373   |71053    |WHITE METAL LANTERN|6       |2010-12-01 09:02:00|3.39     |17850.0   |United Kingdom|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [125]:
df.where("StockCode == 71053") \
  .selectExpr('InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country') \
  .show(2,False)

+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
|536365   |71053    |WHITE METAL LANTERN|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
|536373   |71053    |WHITE METAL LANTERN|6       |2010-12-01 09:02:00|3.39     |17850.0   |United Kingdom|
+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows



In [126]:
from pyspark.sql.functions import *

In [156]:
fabricateQuantity = pow(col("Quantity") * col("UnitPrice"),2) + 5
df.selectExpr("CustomerID", "(pow(Quantity * UnitPrice,2) + 5) as adf").show(2)
# df.select(expr("CustomerID"), fabricateQuantity.alias("asdf")).show(2)

+----------+------------------+
|CustomerID|               adf|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [159]:
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [162]:
df.selectExpr("round(2.5)", "bround(2.5)").show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|            3|             2|
|            3|             2|
+-------------+--------------+
only showing top 2 rows



In [164]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [167]:
df.selectExpr("lower(Description)").show()

+--------------------+
|  lower(Description)|
+--------------------+
|white hanging hea...|
| white metal lantern|
|cream cupid heart...|
|knitted union fla...|
|red woolly hottie...|
|set 7 babushka ne...|
|glass star froste...|
|hand warmer union...|
|hand warmer red p...|
|assorted colour b...|
|poppy's playhouse...|
|poppy's playhouse...|
|feltcraft princes...|
|ivory knitted mug...|
|box of 6 assorted...|
|box of vintage ji...|
|box of vintage al...|
|home building blo...|
|love building blo...|
|recipe box with m...|
+--------------------+
only showing top 20 rows



In [169]:
dateDF = spark.range(10)\
            .withColumn("today", current_date())\
            .withColumn("now", current_timestamp())

dateDF.show()
    

+---+----------+--------------------+
| id|     today|                 now|
+---+----------+--------------------+
|  0|2021-02-28|2021-02-28 21:18:...|
|  1|2021-02-28|2021-02-28 21:18:...|
|  2|2021-02-28|2021-02-28 21:18:...|
|  3|2021-02-28|2021-02-28 21:18:...|
|  4|2021-02-28|2021-02-28 21:18:...|
|  5|2021-02-28|2021-02-28 21:18:...|
|  6|2021-02-28|2021-02-28 21:18:...|
|  7|2021-02-28|2021-02-28 21:18:...|
|  8|2021-02-28|2021-02-28 21:18:...|
|  9|2021-02-28|2021-02-28 21:18:...|
+---+----------+--------------------+



In [171]:
dateDF.selectExpr("date_add(today, 5)").show()

+------------------+
|date_add(today, 5)|
+------------------+
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
|        2021-03-05|
+------------------+



In [179]:
spark.range(10)\
    .withColumn("date", lit("2017-01-01"))\
    .select(to_date(col("date"))).printSchema()

root
 |-- to_date(`date`): date (nullable = true)



In [182]:
df.select(coalesce(col("Description")), coalesce(col("CustomerId"))).show()

+---------------------+--------------------+
|coalesce(Description)|coalesce(CustomerId)|
+---------------------+--------------------+
| WHITE HANGING HEA...|             17850.0|
|  WHITE METAL LANTERN|             17850.0|
| CREAM CUPID HEART...|             17850.0|
| KNITTED UNION FLA...|             17850.0|
| RED WOOLLY HOTTIE...|             17850.0|
| SET 7 BABUSHKA NE...|             17850.0|
| GLASS STAR FROSTE...|             17850.0|
| HAND WARMER UNION...|             17850.0|
| HAND WARMER RED P...|             17850.0|
| ASSORTED COLOUR B...|             13047.0|
| POPPY'S PLAYHOUSE...|             13047.0|
| POPPY'S PLAYHOUSE...|             13047.0|
| FELTCRAFT PRINCES...|             13047.0|
| IVORY KNITTED MUG...|             13047.0|
| BOX OF 6 ASSORTED...|             13047.0|
| BOX OF VINTAGE JI...|             13047.0|
| BOX OF VINTAGE AL...|             13047.0|
| HOME BUILDING BLO...|             13047.0|
| LOVE BUILDING BLO...|             13047.0|
| RECIPE B

In [185]:
df.select(nvl(col("Description"),'x'), coalesce(col("CustomerId"))).show()

NameError: name 'nvl' is not defined

In [186]:
df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [188]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [189]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [204]:
df.where("Description is null")\
    .selectExpr("StockCode", "InvoiceDate as a")\
    .show()

+---------+-------------------+
|StockCode|                  a|
+---------+-------------------+
|    22139|2010-12-01 11:52:00|
|    21134|2010-12-01 14:32:00|
|    22145|2010-12-01 14:33:00|
|    37509|2010-12-01 14:33:00|
|   85226A|2010-12-01 14:34:00|
|    85044|2010-12-01 14:34:00|
|    20950|2010-12-01 14:34:00|
|    37461|2010-12-01 14:35:00|
|    84670|2010-12-01 14:35:00|
|    21777|2010-12-01 16:50:00|
+---------+-------------------+



In [209]:
df.where("Description is null")\
    .select(col("StockCode"), col("InvoiceDate").alias("a"))\
    .show()

+---------+-------------------+
|StockCode|                  a|
+---------+-------------------+
|    22139|2010-12-01 11:52:00|
|    21134|2010-12-01 14:32:00|
|    22145|2010-12-01 14:33:00|
|    37509|2010-12-01 14:33:00|
|   85226A|2010-12-01 14:34:00|
|    85044|2010-12-01 14:34:00|
|    20950|2010-12-01 14:34:00|
|    37461|2010-12-01 14:35:00|
|    84670|2010-12-01 14:35:00|
|    21777|2010-12-01 16:50:00|
+---------+-------------------+



In [210]:
df.show(4)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 4 rows



In [211]:
myRow = Row("11111","12312","aaa","5","2010-12-01 08:26:00","13","213","bbb")
new = spark.createDataFrame([myRow], df.columns)

In [212]:
new.show()

+---------+---------+-----------+--------+-------------------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-------------------+---------+----------+-------+
|    11111|    12312|        aaa|       5|2010-12-01 08:26:00|       13|       213|    bbb|
+---------+---------+-----------+--------+-------------------+---------+----------+-------+



In [213]:
df = df.union(new)

In [214]:
df

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]

In [216]:
df = spark.read\
        .format("csv")\
        .option("header","true")\
        .option("inferSchema","true")\
        .load("/home/anaconda/data/spark/Spark-The-Definitive-Guide-master/data/retail-data/all/*.csv")

In [218]:
df.describe()

DataFrame[summary: string, InvoiceNo: string, StockCode: string, Description: string, Quantity: string, InvoiceDate: string, UnitPrice: string, CustomerID: string, Country: string]

In [219]:
df.take(3)

[Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate='12/1/2010 8:26', UnitPrice=2.75, CustomerID=17850, Country='United Kingdom')]

In [221]:
df.describe().show()

+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|            541909|            541909|              540455|            541909|         541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0|  9.55224954743324|           null|4.611113626083471|15287.690570239585|       null|
| stddev|13428.417280805148| 16799.73762842771|                 NaN|218.08115785023355|           null|96.75985306117803|1713.6003033216148|       null|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|            -8

In [222]:
mySchema = StructType(
    [
        StructField("InvoiceNo",StringType(),True),
        StructField("StockCode",StringType(),True),
        StructField("Description",StringType(),True),
        StructField("Quantity",IntegerType(),True),
        StructField("InvoiceDate",TimestampType(),True),
        StructField("UnitPrice",DoubleType(),True),
        StructField("CustomerID",StringType(),True),
        StructField("Country",StringType(),True)
    ]
)

In [224]:
df = spark.read\
        .format("csv")\
        .schema(mySchema)\
        .option("header","true")\
        .option("inferSchema","true")\
        .load("/home/anaconda/data/spark/Spark-The-Definitive-Guide-master/data/retail-data/all/*.csv")

In [225]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [227]:
df.count()

541909

In [237]:
df.select(countDistinct("Country")).show()

+-----------------------+
|count(DISTINCT Country)|
+-----------------------+
|                     38|
+-----------------------+



In [247]:
df.groupBy("Country","StockCode").agg(
  count("Country"))\
  .show()

TypeError: Column is not iterable