In [2]:
import pyspark

myConf=pyspark.SparkConf()

spark= pyspark.sql.SparkSession\
    .builder\
    .master('local')\
    .appName('myApp')\
    .config(conf=myConf)\
    .getOrCreate()

In [3]:
myList=[('1', 'kim, js', 170),
       ('1', 'lee, sm', 175),
       ('2', 'lim, yg', 180),
       ('2', 'lee', 170)]

# DataFrame

### List로 Dataframe 생성(schema 설정X)

In [4]:
myDf= spark.createDataFrame(myList)

In [5]:
myDf.columns

['_1', '_2', '_3']

In [6]:
myDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [7]:
print(myDf.take(1))

[Row(_1='1', _2='kim, js', _3=170)]


### List로 Dataframe 생성(schema 설정O)

In [8]:
cols= ['year', 'name', 'height']
_myDf= spark.createDataFrame(myList, cols)

In [9]:
_myDf.columns

['year', 'name', 'height']

In [10]:
_myDf.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)



In [11]:
print(_myDf.take(1))

[Row(year='1', name='kim, js', height=170)]


### 임의의 데이터 100개로 Dataframe 생성(schema 설정O)

In [12]:
names=['kim', 'lee', 'lee', 'lim']
items=['espresso', 'latte', 'americano', 'affocato', 'long black', 'macciato']

In [13]:
coffeeDf= spark.createDataFrame([(names[i%4], items[i%6]) for i in range(100)],\
                              ["name", "coffee"]) #column= ["name", "coffee"]

In [14]:
coffeeDf.printSchema()

root
 |-- name: string (nullable = true)
 |-- coffee: string (nullable = true)



In [15]:
coffeeDf.show(10)

+----+----------+
|name|    coffee|
+----+----------+
| kim|  espresso|
| lee|     latte|
| lee| americano|
| lim|  affocato|
| kim|long black|
| lee|  macciato|
| lee|  espresso|
| lim|     latte|
| kim| americano|
| lee|  affocato|
+----+----------+
only showing top 10 rows



### Row 객체를 이용하여 Dictionary/DataFrame 생성

In [16]:
from pyspark.sql import Row
person= Row('year', 'name', 'height') 
row1= person('1', 'kim, js', 170) #열 이름이 person 리스트로 들어감

In [17]:
print("row1: ", row1.year, row1.name, row1.height)
print(type(row1))

row1:  1 kim, js 170
<class 'pyspark.sql.types.Row'>


Dictionary 생성

In [18]:
row1.asDict() # row -> dictionary

{'year': '1', 'name': 'kim, js', 'height': 170}

In [19]:
print(row1.asDict().keys())
print(row1.asDict().values())

dict_keys(['year', 'name', 'height'])
dict_values(['1', 'kim, js', 170])


DataFrame 생성

In [20]:
myRows=[row1,
       person('1', 'lee, sm', 175),
       person('2', 'lim, yg', 180),
       person('2', 'lee', 170)]
print(myRows)

[Row(year='1', name='kim, js', height=170), Row(year='1', name='lee, sm', height=175), Row(year='2', name='lim, yg', height=180), Row(year='2', name='lee', height=170)]


In [21]:
myDf= spark.createDataFrame(myRows)
# 모든 튜플에 person을 씌운 덕분에 myRows의 column명이 person으로 설정됨

In [22]:
print(myDf.printSchema())
myDf.show()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)

None
+----+-------+------+
|year|   name|height|
+----+-------+------+
|   1|kim, js|   170|
|   1|lee, sm|   175|
|   2|lim, yg|   180|
|   2|    lee|   170|
+----+-------+------+



### schema 정의, 생성

In [23]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType

mySchema=StructType([ #구조체 선언
    StructField("year", StringType(), True), #column명, dataType, Null 허용여부
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True),
])

In [24]:
myDf= spark.createDataFrame(myRows, mySchema)

In [25]:
myDf.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)



In [26]:
myDf.show()

+----+-------+------+
|year|   name|height|
+----+-------+------+
|   1|kim, js|   170|
|   1|lee, sm|   175|
|   2|lim, yg|   180|
|   2|    lee|   170|
+----+-------+------+



### RDD -> DF (Spark가 schema를 유추하게 됨)

In [27]:
myList=[('1', 'kim, js', 170),
       ('1', 'lee, sm', 175),
       ('2', 'lim, yg', 180),
       ('2', 'lee', 170)]

In [28]:
myRdd= spark.sparkContext.parallelize(myList)

In [29]:
rddDf= myRdd.toDF() # rdd -> DF 1번째 방법 
rddDf.printSchema() #schema는 자동으로 삽입됨

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [30]:
_rddDf= spark.createDataFrame(myRdd) # rdd -> DF 2번째 방법
_rddDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



### Row를 사용하여 Rdd 형변환 및 column 설정, 이후 rdd ->Df

In [31]:
myRdd.collect()

[('1', 'kim, js', 170),
 ('1', 'lee, sm', 175),
 ('2', 'lim, yg', 180),
 ('2', 'lee', 170)]

In [32]:
from pyspark.sql import Row
testRdd= myRdd.map(lambda x:Row(year=int(x[0]), name=x[1], height=int(x[2])))

In [33]:
testDF= spark.createDataFrame(testRdd)

In [34]:
testDF.printSchema()

root
 |-- height: long (nullable = true)
 |-- name: string (nullable = true)
 |-- year: long (nullable = true)



In [35]:
testDF.show()

+------+-------+----+
|height|   name|year|
+------+-------+----+
|   170|kim, js|   1|
|   175|lee, sm|   1|
|   180|lim, yg|   2|
|   170|    lee|   2|
+------+-------+----+



### Row를 사용하여 rdd 생성

In [36]:
from pyspark.sql import Row

r1= Row(name='js1', age=10)
r2= Row(name='js2', age=20)
myRdd= spark.sparkContext.parallelize([r1,r2])

In [37]:
myRdd.collect()

[Row(age=10, name='js1'), Row(age=20, name='js2')]

# Pandas

### DataFrame -> pandas

In [38]:
#myDf= spark.createDataFrame(myRows, mySchema)

In [39]:
myDf.toPandas()

Unnamed: 0,year,name,height
0,1,"kim, js",170
1,1,"lee, sm",175
2,2,"lim, yg",180
3,2,lee,170


### DF를 write.format()/pandas를 이용하여 csv로 내보내기 

write.format()

In [41]:
import os
# DF를 write.format('com.databricks.spark.csv')를 이용하여 csv로 내보내기
myDf.write.format('com.databricks.spark.csv').save(os.path.join('data', '_myDf.csv'))

In [42]:
!dir data/_myDf.csv

part-00000-24a116be-9f80-48e7-bb03-e5d548fde0fa-c000.csv  _SUCCESS


pandas

In [43]:
# DF를 pandas를 이용하여 csv로 내보내기
myDf.toPandas().to_csv(os.path.join("data", 'pandas_myDf.csv'))

In [44]:
!dir data/pandas_myDf.csv

data/pandas_myDf.csv


### pandas에서 column 생성, 삭제

In [46]:
import pandas as pd  
icc= pd.DataFrame({'contry': ['South Korea', 'Japen','Honkong'], 'codes':[81, 82, 83]})
#{column:[value]} 형식으로 column 설정

In [47]:
icc

Unnamed: 0,contry,codes
0,South Korea,81
1,Japen,82
2,Honkong,83


In [48]:
icc[icc['codes']==81]

Unnamed: 0,contry,codes
0,South Korea,81


### Rdd로 csv 파일을 읽어서 DF로 변환

In [49]:
%%writefile data/ds_spark_2cols.csv
35, 2
40, 27
12, 38
15, 31
21, 1
14, 19
46, 1
10, 34
28, 3
48, 1
16, 2
30, 3
32, 2
48, 1
31, 2
22, 1
12, 3
39, 29
19, 37
25, 2

Writing data/ds_spark_2cols.csv


In [50]:
from pyspark.sql import Row
cfile= os.path.join('data', 'ds_spark_2cols.csv')
lines= spark.sparkContext.textFile(cfile)

In [53]:
_col12= lines.map(lambda l: l.split(","))
col12= _col12.map(lambda p: Row(col1= int(p[0].strip()), col2= int(p[1].strip())))

rddCsv_myDf= spark.createDataFrame(col12)

In [54]:
rddCsv_myDf.printSchema()
rddCsv_myDf.collect()

root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)



[Row(col1=35, col2=2),
 Row(col1=40, col2=27),
 Row(col1=12, col2=38),
 Row(col1=15, col2=31),
 Row(col1=21, col2=1),
 Row(col1=14, col2=19),
 Row(col1=46, col2=1),
 Row(col1=10, col2=34),
 Row(col1=28, col2=3),
 Row(col1=48, col2=1),
 Row(col1=16, col2=2),
 Row(col1=30, col2=3),
 Row(col1=32, col2=2),
 Row(col1=48, col2=1),
 Row(col1=31, col2=2),
 Row(col1=22, col2=1),
 Row(col1=12, col2=3),
 Row(col1=39, col2=29),
 Row(col1=19, col2=37),
 Row(col1=25, col2=2)]

### DF로 csv를 직접 읽기

In [55]:
%%writefile data/ds_spark.csv
1, 2, 3, 4
11, 22, 33, 44
111, 222, 333, 444

Writing data/ds_spark.csv


format().load()

In [76]:
df= spark\
    .read\
    .format('com.databricks.spark.csv')\
    .options(header='true', inferschema=True, delimiter=',')\
    .load(os.path.join('data', 'ds_spark.csv'))

In [77]:
df.show()

+---+-----+-----+-----+
|  1|    2|    3|    4|
+---+-----+-----+-----+
| 11| 22.0| 33.0| 44.0|
|111|222.0|333.0|444.0|
+---+-----+-----+-----+



In [78]:
df.printSchema()

root
 |-- 1: integer (nullable = true)
 |--  2: double (nullable = true)
 |--  3: double (nullable = true)
 |--  4: double (nullable = true)



csv()

In [82]:
df= spark\
    .read\
    .options(header=True, inferschema=True, delimiter=',')\
    .csv(os.path.join('data', 'ds_spark.csv'))

In [83]:
df.show()

+---+-----+-----+-----+
|  1|    2|    3|    4|
+---+-----+-----+-----+
| 11| 22.0| 33.0| 44.0|
|111|222.0|333.0|444.0|
+---+-----+-----+-----+



### tsv파일 읽기

In [85]:
%%writefile data/ds_spark_heightweight.txt
1	65.78	112.99
2	71.52	136.49
3	69.40	153.03
4	68.22	142.34
5	67.79	144.30
6	68.70	123.30
7	69.80	141.49
8	70.01	136.46
9	67.90	112.37
10	66.78	120.67
11	66.49	127.45
12	67.62	114.14
13	68.30	125.61
14	67.12	122.46
15	68.28	116.09
16	71.09	140.00
17	66.46	129.50
18	68.65	142.97
19	71.23	137.90
20	67.13	124.04
21	67.83	141.28
22	68.88	143.54
23	63.48	97.90
24	68.42	129.50
25	67.63	141.85
26	67.21	129.72
27	70.84	142.42
28	67.49	131.55
29	66.53	108.33
30	65.44	113.89
31	69.52	103.30
32	65.81	120.75
33	67.82	125.79
34	70.60	136.22
35	71.80	140.10
36	69.21	128.75
37	66.80	141.80
38	67.66	121.23
39	67.81	131.35
40	64.05	106.71
41	68.57	124.36
42	65.18	124.86
43	69.66	139.67
44	67.97	137.37
45	65.98	106.45
46	68.67	128.76
47	66.88	145.68
48	67.70	116.82
49	69.82	143.62
50	69.09	134.93

Writing data/ds_spark_heightweight.txt


### tsv를 rdd로 읽기

In [86]:
from pyspark.sql.types import *
_tRdd= spark.sparkContext.textFile(os.path.join('data', 'ds_spark_heightweight.txt'))

In [90]:
_tRddSplt=_tRdd.map(lambda line: [float(x) for x in line.split('\t')])
_tRddSplt.take(5)

[[1.0, 65.78, 112.99],
 [2.0, 71.52, 136.49],
 [3.0, 69.4, 153.03],
 [4.0, 68.22, 142.34],
 [5.0, 67.79, 144.3]]

### tsv -> Rdd -> DF

In [96]:
tDF= spark.createDataFrame(_tRddSplt, ["id", "weight", "height"])
tDF.printSchema()

root
 |-- id: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)



In [97]:
tDF.take(5)

[Row(id=1.0, weight=65.78, height=112.99),
 Row(id=2.0, weight=71.52, height=136.49),
 Row(id=3.0, weight=69.4, height=153.03),
 Row(id=4.0, weight=68.22, height=142.34),
 Row(id=5.0, weight=67.79, height=144.3)]

### tsv를 text()로 읽어서 #column=1을 #column=3으로 수정하기 

In [102]:
tText= spark.read.text(os.path.join('data', 'ds_spark_heightweight.txt'))

In [103]:
tText.printSchema()

root
 |-- value: string (nullable = true)



In [105]:
from pyspark.sql.functions import split
split_col= split(tText['value'], '\t')

In [106]:
tText= tText.withColumn('weight', split_col.getItem(1))
tText= tText.withColumn('height', split_col.getItem(2))

In [107]:
tText.show()

+---------------+------+------+
|          value|weight|height|
+---------------+------+------+
| 1	65.78	112.99| 65.78|112.99|
| 2	71.52	136.49| 71.52|136.49|
| 3	69.40	153.03| 69.40|153.03|
| 4	68.22	142.34| 68.22|142.34|
| 5	67.79	144.30| 67.79|144.30|
| 6	68.70	123.30| 68.70|123.30|
| 7	69.80	141.49| 69.80|141.49|
| 8	70.01	136.46| 70.01|136.46|
| 9	67.90	112.37| 67.90|112.37|
|10	66.78	120.67| 66.78|120.67|
|11	66.49	127.45| 66.49|127.45|
|12	67.62	114.14| 67.62|114.14|
|13	68.30	125.61| 68.30|125.61|
|14	67.12	122.46| 67.12|122.46|
|15	68.28	116.09| 68.28|116.09|
|16	71.09	140.00| 71.09|140.00|
|17	66.46	129.50| 66.46|129.50|
|18	68.65	142.97| 68.65|142.97|
|19	71.23	137.90| 71.23|137.90|
|20	67.13	124.04| 67.13|124.04|
+---------------+------+------+
only showing top 20 rows

