In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder.master("local[*]") \
    .enableHiveSupport().appName("hive02") \
    .getOrCreate()

spark.version   

'2.4.5'

In [3]:
df2 = spark.createDataFrame([(1,'a',10),(2,'b',20),(3,'c',30),(4,'c',40)]).toDF("id","name","age")
df2.printSchema()
df2.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+---+----+---+
| id|name|age|
+---+----+---+
|  1|   a| 10|
|  2|   b| 20|
|  3|   c| 30|
|  4|   c| 40|
+---+----+---+



In [4]:
df2.write.mode("Overwrite").option("header", "true").csv("hdfs://127.0.0.1:9000/test1/df2")

In [4]:
df3 = spark.read.option("header", "true") \
     .csv('hdfs://127.0.0.1:9000/test1/df2')  

df3.printSchema()
df3.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

+---+----+---+
| id|name|age|
+---+----+---+
|  1|   a| 10|
|  2|   b| 20|
|  3|   c| 30|
|  4|   c| 40|
+---+----+---+



In [6]:
df4 = df3.toPandas()
df4

Unnamed: 0,id,name,age
0,1,a,10
1,2,b,20
2,3,c,30
3,4,c,40


In [7]:
df5 = spark.createDataFrame(df4)
df5.show()

+---+----+---+
| id|name|age|
+---+----+---+
|  1|   a| 10|
|  2|   b| 20|
|  3|   c| 30|
|  4|   c| 40|
+---+----+---+



In [5]:
df = spark.read.format("json").load("./files1/2010-summary.json")
df.printSchema()
df.show()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|    

In [14]:
dfPan=df.toPandas()
dfPan

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,1
1,United States,Ireland,264
2,United States,India,69
3,Egypt,United States,24
4,Equatorial Guinea,United States,1
...,...,...,...
250,United States,French Guiana,1
251,United States,Haiti,226
252,United States,Uganda,1
253,"Bonaire, Sint Eustatius, and Saba",United States,16


## 문제 : ORIGIN_COUNTRY_NAME 가 United States인것만 출력하시오.

In [14]:
df.where(col("ORIGIN_COUNTRY_NAME") == 'United States')\
  .show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|              Guyana|      United States|   17|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|Saint Vincent and...|      United States|    1|
|               Italy|      United States|  390|
|            Pakistan|      United States|    9|
|             Iceland|      United States|  118|
|    Marshall Islands|      United States|   77|
|          Luxembourg|      United States|   91|
|            Honduras|      United States|  391|
|         The Bahamas|      United States|  903|
|         El Salvado

In [12]:
dfPan=df.toPandas()
dfPan

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,1
1,United States,Ireland,264
2,United States,India,69
3,Egypt,United States,24
4,Equatorial Guinea,United States,1
...,...,...,...
250,United States,French Guiana,1
251,United States,Haiti,226
252,United States,Uganda,1
253,"Bonaire, Sint Eustatius, and Saba",United States,16


In [13]:
dfPanUn=dfPan[dfPan["ORIGIN_COUNTRY_NAME"]=="United States"]
dfPanUn

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
3,Egypt,United States,24
4,Equatorial Guinea,United States,1
7,Costa Rica,United States,477
8,Senegal,United States,29
10,Guyana,United States,17
...,...,...,...
246,Uruguay,United States,54
247,Cook Islands,United States,13
249,Bulgaria,United States,1
253,"Bonaire, Sint Eustatius, and Saba",United States,16


# count 가 100보다 큰것만 출력하시오.

In [10]:
dfCo=df.where(col("count")>=100)
dfCo.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Ireland|  264|
|          Costa Rica|      United States|  477|
|Turks and Caicos ...|      United States|  136|
|               Italy|      United States|  390|
|       United States|             Russia|  156|
|       United States|        Netherlands|  570|
|             Iceland|      United States|  118|
|            Honduras|      United States|  391|
|         The Bahamas|      United States|  903|
|         El Salvador|      United States|  519|
|         Switzerland|      United States|  315|
|           Hong Kong|      United States|  252|
| Trinidad and Tobago|      United States|  187|
|       United States|            Ecuador|  345|
|              Mexico|      United States| 6200|
|             Ecuador|      United States|  272|
|       United States|           Portugal|  104|
|       United State

In [16]:
dfPan[dfPan["count"]>=100]

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
1,United States,Ireland,264
7,Costa Rica,United States,477
15,Turks and Caicos Islands,United States,136
18,Italy,United States,390
19,United States,Russia,156
...,...,...,...
240,Australia,United States,290
242,United States,Cayman Islands,251
243,United States,Trinidad and Tobago,200
248,United States,Saint Kitts and Nevis,127


# DEST_COUNTRY_NAME가 Hondura인것만 출력하시오.

In [11]:
Honduras=df.where(col("DEST_COUNTRY_NAME") == 'Honduras')

Honduras.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|         Honduras|      United States|  391|
+-----------------+-------------------+-----+



In [17]:
dfPanHo=dfPan[dfPan["DEST_COUNTRY_NAME"]=="Honduras"]
dfPanHo

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
26,Honduras,United States,391


In [26]:
from pyspark.sql import SparkSession
import pandas as pd


spark = SparkSession.builder.master("local[*]") \
        .enableHiveSupport().appName("hive01") \
        .config("spark.datasource.hive.metastore.uris","hdfs://127.0.0.1:9000") \
        .config("spark.sql.warehouse.dir","hdfs://127.0.0.1:9000/user/hive/warehouse") \
        .config("spark.sql.catalogImplementation","hive") \
        .getOrCreate()

df1 = spark.createDataFrame([(1,'a',10),(2,'b',20),(3,'c',30)]).toDF("id","name","age")
df1.createOrReplaceTempView("table1")  #데이터 프레임을 table1로 만듬
spark.sql("SELECT * FROM table1").show()

spark.sql("SELECT name,age FROM table1").show()
df1


+---+----+---+
| id|name|age|
+---+----+---+
|  1|   a| 10|
|  2|   b| 20|
|  3|   c| 30|
+---+----+---+

+----+---+
|name|age|
+----+---+
|   a| 10|
|   b| 20|
|   c| 30|
+----+---+



DataFrame[id: bigint, name: string, age: bigint]

In [22]:
spark.sql("create database db1")

DataFrame[]

In [24]:
spark.sql("create table db1.t01 as (select * from table1)")

DataFrame[]

In [27]:
spark.sql("INSERT INTO db1.t01 VALUES(11,'a',12)")
spark.sql("INSERT INTO db1.t01 VALUES(12,'a',22)")
spark.sql("INSERT INTO db1.t01 VALUES(13,'a',32)")

DataFrame[]

In [31]:
spark.sql("SELECT * FROM db1.t01").show()

+---+----+---+
| id|name|age|
+---+----+---+
| 11|   a| 12|
| 13|   a| 32|
| 12|   a| 22|
|  1|   a| 10|
|  2|   b| 20|
|  3|   c| 30|
+---+----+---+



In [40]:
df1 = spark.read.format("csv").load("./files2/2010-12-01.csv")
df1.printSchema()
df1.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|      _c0|      _c1|                 _c2|     _c3|                _c4|      _c5|       _c6|           _c7|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:2

In [45]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport() \
        .appName("ex01_1") \
        .master("local[*]") \
        .getOrCreate()

df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("./files2/*.csv")
df.cache()



DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [42]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show() # 541909

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [46]:
df.createOrReplaceTempView("table2") 

In [43]:
spark.sql("create database db2")

DataFrame[]

In [50]:
spark.sql("create table db1.t02 as(select * from table2)")

DataFrame[]

# 나라별 UnitPrice 합계를 구하시오.

In [51]:
spark.sql("SELECT * FROM db1.t02").show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA