In [78]:
from pyspark.ml.feature import *
from pyspark.sql.session import SparkSession
from pyspark.ml import *
from pyspark.sql import *
from pyspark.sql.functions import *
from functools import *

In [3]:
# spark session 필수 설정
spark = SparkSession.builder.appName("Test").config("spark.executor.instances", "1").config("spark.executor.cores", "1").config("spark.executor.memory", "2g").config("spark.driver.memory", "2g").getOrCreate()


In [23]:
# df 생성
staticDataFrame = spark.read.format('csv').option("header","true").option("inferSchema","true").load("./data/exam/retail-data/by-day/*.csv")

In [82]:
preppedDataFrame = staticDataFrame.na.fill(0).withColumn("day_of_week",date_format(col("InvoiceDate"),"EEEE")).coalesce(5)

In [83]:
preppedDataFrame.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
|   580538|    21544|SKULLS  WATER TRA..

In [84]:
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")
# trainDataFrame = 0
# testDataFrame = 0

In [86]:
trainDataFrame.columns
trainDataFrame.count()
testDataFrame.count()

296006

In [87]:
indexer = StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")

In [88]:
indexer

StringIndexer_91ad64457c91

In [96]:
encoder = OneHotEncoder().setInputCol("day_of_week_index").setOutputCol("day_of_week_encoded")

In [97]:
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"]).setOutputCol("features")

In [98]:
transformationPipeline = Pipeline().setStages([indexer,encoder,vectorAssembler])

In [99]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [95]:
staticDataFrame.printSchema()
in

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



StringIndexer_91ad64457c91

In [102]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [103]:
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [105]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans().setK(20).setSeed(1)

In [106]:
kmModel = kmeans.fit(transformedTraining)

In [107]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [108]:
kmModel.computeCost(transformedTest)

517507094.7222117

In [116]:
### Data Type 다루기

df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("./data/exam/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.show(5)
df.createOrReplaceTempView("dfTable")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365| 

In [119]:
from pyspark.sql.functions import lit
df.select(lit(5),lit("five"),lit(5.0))

df.show(1)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 1 row



In [130]:
# 불리언 타입 다루기

df.where(col("InvoiceNo") != 536365).select("InvoiceNo", "Description").show(5,True)

+---------+--------------------+
|InvoiceNo|         Description|
+---------+--------------------+
|   536366|HAND WARMER UNION...|
|   536366|HAND WARMER RED P...|
|   536367|ASSORTED COLOUR B...|
|   536367|POPPY'S PLAYHOUSE...|
|   536367|POPPY'S PLAYHOUSE...|
+---------+--------------------+
only showing top 5 rows



In [131]:
df.where(col("InvoiceNo") != 536365).select("InvoiceNo", "Description").show(5,False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [136]:
df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [140]:
df.stat.crosstab("InvoiceDate","Country").show()
# stockcode 고객 아이디 - 국가 배송?


+--------------------+---------+----+------+-------+-----------+------+--------------+
| InvoiceDate_Country|Australia|EIRE|France|Germany|Netherlands|Norway|United Kingdom|
+--------------------+---------+----+------+-------+-----------+------+--------------+
|2010-12-01 12:40:...|        0|   0|     0|      0|          0|     0|             5|
|2010-12-01 16:15:...|        0|   0|     0|      0|          0|     0|             8|
|2010-12-01 08:34:...|        0|   0|     0|      0|          0|     0|            16|
|2010-12-01 15:21:...|        0|   0|     0|      0|          0|     0|             6|
|2010-12-01 12:35:...|        0|   0|     0|      0|          0|     0|            15|
|2010-12-01 10:19:...|        0|   0|     0|      0|          0|     0|            24|
|2010-12-01 10:03:...|       14|   0|     0|      0|          0|     0|             0|
|2010-12-01 15:46:...|        0|   0|     0|      0|          0|     0|             1|
|2010-12-01 16:25:...|        0|   0|     0

In [146]:
df.na.drop("all",subset=["StockCode","InvoiceNo"])
df.na.replace([""],["UNKNOWN"],"Description")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [154]:
from pyspark.sql.functions import struct
complexDF = df.select(struct("Description","InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
complexDF.show(5, False)

+---------------------------------------------+
|complex                                      |
+---------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER, 536365] |
|[WHITE METAL LANTERN, 536365]                |
|[CREAM CUPID HEARTS COAT HANGER, 536365]     |
|[KNITTED UNION FLAG HOT WATER BOTTLE, 536365]|
|[RED WOOLLY HOTTIE WHITE HEART., 536365]     |
+---------------------------------------------+
only showing top 5 rows



In [169]:
from pyspark.sql.functions import split, size, explode
# split
df.select(split(col("Description"), " ")).show(3,False)
# size
df.select(size(split(col("Description"), " "))).show(3)
# explode
df.withColumn("splitted", split(col("Description"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Description","InvoiceNo","exploded").show(3,False)

+----------------------------------------+
|split(Description,  )                   |
+----------------------------------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|[WHITE, METAL, LANTERN]                 |
|[CREAM, CUPID, HEARTS, COAT, HANGER]    |
+----------------------------------------+
only showing top 3 rows

+---------------------------+
|size(split(Description,  ))|
+---------------------------+
|                          5|
|                          3|
|                          5|
+---------------------------+
only showing top 3 rows

+----------------------------------+---------+--------+
|Description                       |InvoiceNo|exploded|
+----------------------------------+---------+--------+
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |WHITE   |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HANGING |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HEART   |
+----------------------------------+---------+--------+
only showing top 3 rows



In [176]:
from pyspark.sql.functions import create_map
# 배열과 동일한 방법으로 key value 출력
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(3,False)

# selectexpr에 키를 조회하여 키값 출력 없으면 null출력
df.select(create_map("Description", "InvoiceNo").alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(3)

# key : value 값으로 출력
df.select(create_map("Description", "InvoiceNo").alias("complex_map"))\
.selectExpr("explode(complex_map)").show(3,False)


+----------------------------------------------+
|complex_map                                   |
+----------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER -> 536365]|
|[WHITE METAL LANTERN -> 536365]               |
|[CREAM CUPID HEARTS COAT HANGER -> 536365]    |
+----------------------------------------------+
only showing top 3 rows

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|                            null|
|                          536365|
|                            null|
+--------------------------------+
only showing top 3 rows

+----------------------------------+------+
|key                               |value |
+----------------------------------+------+
|WHITE HANGING HEART T-LIGHT HOLDER|536365|
|WHITE METAL LANTERN               |536365|
|CREAM CUPID HEARTS COAT HANGER    |536365|
+----------------------------------+------+
only showing top 3 rows



In [193]:
# JSON 다루기
from pyspark.sql.functions import *
from pyspark.sql.types import *

jsonDF = spark.range(1).selectExpr("""'{"myJSONKey":{"myJSONValue":[1,2,3]}}'as jsonString""")
jsonDF.show(1,False)

# get_json으로 내부 객체에 대한 값들을 하나 하나씩 기자오기 가능/tuple은 안돼 
jsonDF.select(get_json_object('jsonString','$.myJSONKey.myJSONValue[1]').alias("column"),
json_tuple("jsonString", "myJSONKey")).show(3,False)

# 컬럼이름을 key값으로 record를 value로 json형식 만들기
df.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json("myStruct")).show(3,False)

#
parseSchema = StructType((StructField("InvoiceNo", StringType(),True),
                         StructField("Description", StringType(),True)))

df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json("myStruct").alias("newJSON"))\
.select(from_json("newJSON", parseSchema), "newJSON").show(3,False)

+-------------------------------------+
|jsonString                           |
+-------------------------------------+
|{"myJSONKey":{"myJSONValue":[1,2,3]}}|
+-------------------------------------+

+------+-----------------------+
|column|c0                     |
+------+-----------------------+
|2     |{"myJSONValue":[1,2,3]}|
+------+-----------------------+

+-------------------------------------------------------------------------+
|structstojson(myStruct)                                                  |
+-------------------------------------------------------------------------+
|{"InvoiceNo":"536365","Description":"WHITE HANGING HEART T-LIGHT HOLDER"}|
|{"InvoiceNo":"536365","Description":"WHITE METAL LANTERN"}               |
|{"InvoiceNo":"536365","Description":"CREAM CUPID HEARTS COAT HANGER"}    |
+-------------------------------------------------------------------------+
only showing top 3 rows

+--------------------------------------------+------------------------------

In [205]:
# UDF : 사용자 정의 함수 (user define function)

# 함수 생성(정의)
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):
    return double_value**3
power3(2.0)

# 함수 등록
power3udf = udf(power3)

udfExampleDF.select(power3udf("num")).show()

# int 타입으로 하면 출력되지만 더블타입으로 바꾸면 null 출력
spark.udf.register("power3py", power3, IntegerType()) # 여기!
udfExampleDF.selectExpr("power3py(num)").show(3)


+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+

+-------------+
|power3py(num)|
+-------------+
|            0|
|            1|
|            8|
+-------------+
only showing top 3 rows



In [208]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("./data/exam/retail-data/all/*.csv").coalesce(5)
df.printSchema()
df.show(5)
df.createOrReplaceTempView("dfTable")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|  

In [209]:
# 집계함수
df.count()

541909

In [213]:
# 비대칭, 첨도
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

# 상관계수(corr), 표본공분산(covar_samp), 모공분산(covar_pop)
df.select(corr("InvoiceNo","Quantity"), covar_samp("InvoiceNo","Quantity"),covar_pop("InvoiceNo","Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610528376|119768.05495530753|
+--------------------+------------------+

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085640497E-4|             1052.7280543915997|            1052.7260778754955|
+-------------------------+-------------------------------+------------------------------+



In [218]:
# 복합데이터 집계
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [226]:
# 그룹화
df.groupBy("InvoiceNo", "CustomerId").count().show(3)

# 그룹화 집계
df.groupBy("InvoiceNo").agg(count("Quantity").alias("Quan"), expr("count(Quantity)")).show(3)

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
+---------+----------+-----+
only showing top 3 rows

+---------+----+---------------+
|InvoiceNo|Quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
+---------+----+---------------+
only showing top 3 rows



In [243]:
# Window 함수
dfWithDate = df.withColumn("date", to_date("InvoiceDate", "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
dfWithDate.show(3)

from pyspark.sql.window import Window
# 프레임 명세(rowsBetween)구문 사용하여 참조해 로우 포함 확인
windowSpec = Window.partitionBy("CustomerId","date").orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

# 해당 조건의 컬럼을 표현식으로 반환해 select구문에 사용가능
maxPurchaseQuantity = max("Quantity").over(windowSpec)
maxPurchaseQuantity.columns

# 중복로우 제거 위해 dense_rank사용(위와 같이 표현식 반환)
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

# 위에 정의를 사용해 해당 결과 출력
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select("CustomerId","date","Quantity",purchaseRank.alias("quantityRank")\
,purchaseDenseRank.alias("quantityDenseRank"),maxPurchaseQuantity.alias("maxPurchaseQuantity")).show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
only showing top 3 rows

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+--------

In [247]:
# 그룹화 셋
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
dfNoNull.show(3)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
only showing top 3 rows



In [254]:
# Roll up
# `(백 틱) 유의해서 그룹바이 스타일의 다양한 연산수행의 다차원 집계기능
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
.selectExpr("Date","Country","`sum(Quantity)` as total_quantity")\
.orderBy("Date")

rolledUpDF.show(5)

rolledUpDF.where("Country IS NULL").show(3)
rolledUpDF.where("Country IS NOT NULL").show(3)
rolledUpDF.where("Date IS NULL").show(3)

+----------+---------+--------------+
|      Date|  Country|total_quantity|
+----------+---------+--------------+
|      null|     null|       5176450|
|2010-12-01|  Germany|           117|
|2010-12-01|   Norway|          1852|
|2010-12-01|Australia|           107|
|2010-12-01|     null|         26814|
+----------+---------+--------------+
only showing top 5 rows

+----------+-------+--------------+
|      Date|Country|total_quantity|
+----------+-------+--------------+
|      null|   null|       5176450|
|2010-12-01|   null|         26814|
|2010-12-02|   null|         21023|
+----------+-------+--------------+
only showing top 3 rows

+----------+---------+--------------+
|      Date|  Country|total_quantity|
+----------+---------+--------------+
|2010-12-01|  Germany|           117|
|2010-12-01|Australia|           107|
|2010-12-01|   Norway|          1852|
+----------+---------+--------------+
only showing top 3 rows

+----+-------+--------------+
|Date|Country|total_quantity|
+----

In [269]:
# 큐브
# 한 테이블 안에 date로 묶은것 country로 묶은것 null 끼리 묶은거 다있다
dfNoNull.cube("Date","Country").agg(sum("Quantity"))\
.select("Date","Country","sum(Quantity)").orderBy("Date").show(5)


dfNoNull.cube("Country").agg(sum("Quantity"))\
.select("Country","sum(Quantity)").orderBy("Country").show(5)

+----+------------------+-------------+
|Date|           Country|sum(Quantity)|
+----+------------------+-------------+
|null|               USA|         1034|
|null|    Czech Republic|          592|
|null|           Denmark|         8188|
|null|European Community|          497|
|null|            Norway|        19247|
+----+------------------+-------------+
only showing top 5 rows

+---------+-------------+
|  Country|sum(Quantity)|
+---------+-------------+
|     null|      5176450|
|Australia|        83653|
|  Austria|         4827|
|  Bahrain|          260|
|  Belgium|        23152|
+---------+-------------+
only showing top 5 rows



In [286]:
# grouping_id
# 이거 왜 안돼!?
# dfNoNull.cube("Date", "Country").agg(grouping_id(), sum("Quantity"))\
# .orderBy(col("grouping_id()"),desc).show()

In [283]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

pivoted.where("date > '2011-12-05'").select("date","`USA_sum(CAST(Quantity AS BIGINT))`").show()

+----------+---------------------------------+
|      date|USA_sum(CAST(Quantity AS BIGINT))|
+----------+---------------------------------+
|2011-12-06|                             null|
|2011-12-09|                             null|
|2011-12-08|                             -196|
|2011-12-07|                             null|
+----------+---------------------------------+



In [285]:
# pivoted.printSchema()

In [5]:
spark.sql("""create table flights (
             dest_country_name string,
             origin_country_name string,
             count long)
             using json options (path '/home/lab01/data/exam/flight-data/json/2015-summary.json')""")

DataFrame[]

In [7]:
spark.sql("""describe table flights""")

DataFrame[col_name: string, data_type: string, comment: string]

In [10]:
spark.sql("""select * from flights""")

DataFrame[dest_country_name: string, origin_country_name: string, count: bigint]

In [11]:
rdd1 = spark.range(10).rdd
rdd1

MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:0

In [15]:
rdd2 = spark.range(10).toDF("id").rdd.map(lambda row: row[0])
rdd2 

PythonRDD[25] at RDD at PythonRDD.scala:53

In [18]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
myCollection

['Spark',
 'The',
 'Definitive',
 'Guide',
 ':',
 'Big',
 'Data',
 'Processing',
 'Made',
 'Simple']

In [21]:
words = spark.sparkContext.parallelize(myCollection, 2)
words.setName("myWords")
words.name()

'myWords'

In [35]:
words.distinct().count()
words.take(10)

['Spark',
 'The',
 'Definitive',
 'Guide',
 ':',
 'Big',
 'Data',
 'Processing',
 'Made',
 'Simple']

In [30]:
def startWithS(individual):
    return individual.startswith("S")


words.filter(lambda word : startWithS(word)).collect()

['Spark', 'Simple']

In [38]:
prdd = words.map(lambda word : (word.lower(),1))
prdd.collect()

[('spark', 1),
 ('the', 1),
 ('definitive', 1),
 ('guide', 1),
 (':', 1),
 ('big', 1),
 ('data', 1),
 ('processing', 1),
 ('made', 1),
 ('simple', 1)]

In [62]:
keyword = words.keyBy(lambda word : (word.lower()[0]))
keyword.collect()
prdd2 = keyword.mapValues(lambda word : word.upper())
# prdd2.collect()
# prdd2.take(5)
# prdd2.keys().collect()
# prdd2.values().collect()
prdd2.lookup("s")

['SPARK', 'SIMPLE']

In [65]:
import random

distinctChars = words.flatMap(lambda word : list(word.lower())).distinct().collect()
distinctChars

['s',
 'p',
 'r',
 'h',
 'd',
 'i',
 'g',
 'b',
 'c',
 'l',
 'a',
 'k',
 't',
 'e',
 'f',
 'n',
 'v',
 'u',
 ':',
 'o',
 'm']

In [67]:
chars = words.flatMap(lambda word : word.lower())
KVcharacters = chars.map(lambda letter : (letter,1))

def maxFunc(left, right):
    return max(left, right)
def addFunc(left, right):
    return left + right
nums = spark.sparkContext.parallelize(range(1,31),5)

In [73]:
KVcharacters.countByKey()
# KVcharacters.collect()
# chars.collect()

defaultdict(int,
            {'s': 4,
             'p': 3,
             'a': 4,
             'r': 2,
             'k': 1,
             't': 3,
             'h': 1,
             'e': 7,
             'd': 4,
             'f': 1,
             'i': 7,
             'n': 2,
             'v': 1,
             'g': 3,
             'u': 1,
             ':': 1,
             'b': 1,
             'o': 1,
             'c': 1,
             'm': 2,
             'l': 1})

In [79]:
KVcharacters.groupByKey().map(lambda row : (row[0], reduce(addFunc, row[1]))).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [80]:
KVcharacters.reduceByKey(addFunc).collect()

[('s', 4),
 ('p', 3),
 ('r', 2),
 ('h', 1),
 ('d', 4),
 ('i', 7),
 ('g', 3),
 ('b', 1),
 ('c', 1),
 ('l', 1),
 ('a', 4),
 ('k', 1),
 ('t', 3),
 ('e', 7),
 ('f', 1),
 ('n', 2),
 ('v', 1),
 ('u', 1),
 (':', 1),
 ('o', 1),
 ('m', 2)]

In [86]:
distinctChars = words.flatMap(lambda word : word.lower()).distinct()
distinctChars.collect()
keyedChars = distinctChars.map(lambda c : (c, random.random()))
outputPartitions = 10

KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()

51

In [87]:
KVcharacters.join(keyedChars).collect()

[('s', (1, 0.7041826943009919)),
 ('s', (1, 0.7041826943009919)),
 ('s', (1, 0.7041826943009919)),
 ('s', (1, 0.7041826943009919)),
 ('p', (1, 0.6401363631797711)),
 ('p', (1, 0.6401363631797711)),
 ('p', (1, 0.6401363631797711)),
 ('r', (1, 0.3587596936347467)),
 ('r', (1, 0.3587596936347467)),
 ('i', (1, 0.9369982901689917)),
 ('i', (1, 0.9369982901689917)),
 ('i', (1, 0.9369982901689917)),
 ('i', (1, 0.9369982901689917)),
 ('i', (1, 0.9369982901689917)),
 ('i', (1, 0.9369982901689917)),
 ('i', (1, 0.9369982901689917)),
 ('g', (1, 0.19476578553764645)),
 ('g', (1, 0.19476578553764645)),
 ('g', (1, 0.19476578553764645)),
 ('b', (1, 0.7301099226028489)),
 ('c', (1, 0.07274743413993012)),
 ('l', (1, 0.29903368477377823)),
 ('a', (1, 0.08325542043385781)),
 ('a', (1, 0.08325542043385781)),
 ('a', (1, 0.08325542043385781)),
 ('a', (1, 0.08325542043385781)),
 ('e', (1, 0.635429482954934)),
 ('e', (1, 0.635429482954934)),
 ('e', (1, 0.635429482954934)),
 ('e', (1, 0.635429482954934)),
 ('e'