## 판다스로 해보기

In [108]:
import pandas as pd

df = pd.read_csv('1800.csv',
                 names = ['stationID', 'date', 'measure_type', 'temperature'],
                 usecols = [0, 1, 2, 3])

df.head()

Unnamed: 0,stationID,date,measure_type,temperature
0,ITE00100554,18000101,TMAX,-75
1,ITE00100554,18000101,TMIN,-148
2,GM000010962,18000101,PRCP,0
3,EZE00100082,18000101,TMAX,-86
4,EZE00100082,18000101,TMIN,-135


In [93]:
df_tmin = df[df['measure_type'] == 'TMIN']

df_tmin.head()

Unnamed: 0,stationID,date,measure_type,temperature
1,ITE00100554,18000101,TMIN,-148
4,EZE00100082,18000101,TMIN,-135
6,ITE00100554,18000102,TMIN,-125
9,EZE00100082,18000102,TMIN,-130
11,ITE00100554,18000103,TMIN,-46


In [94]:
df_tmin_select = df_tmin[['stationID', 'temperature']]

df_tmin_select.head()

Unnamed: 0,stationID,temperature
1,ITE00100554,-148
4,EZE00100082,-135
6,ITE00100554,-125
9,EZE00100082,-130
11,ITE00100554,-46


In [101]:
df_tmin_select.groupby('stationID').min()

Unnamed: 0_level_0,temperature
stationID,Unnamed: 1_level_1
EZE00100082,-135
ITE00100554,-148


## Spark로 해보기

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set('spark.app.name', 'PySpark DataFrame 1')
conf.set('spark.master', 'local[*]')

spark = SparkSession.builder\
        .config(conf = conf)\
        .getOrCreate()

### 데이터 로드, 스키마 지정

In [3]:
df = spark.read.format('csv').load('1800.csv')

df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [4]:
df = spark.read.format('csv').load('1800.csv')\
        .toDF('stationID', 'date', 'measure_type', 'temperature', '_c4', '_c5', '_c6', '_c7')

df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: string (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [5]:
df = spark.read.format('csv').option('inferschema', 'true')\
        .load('1800.csv')\
        .toDF('stationID', 'date', 'measure_type', 'temperature', '_c4', '_c5', '_c6', '_c7')

df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)



In [6]:
from pyspark.sql.types import StringType, IntegerType, FloatType, StructType, StructField

schema = StructType([
    StructField('stationID', StringType(), True),
    StructField('date', IntegerType(), True),
    StructField('measure_type', StringType(), True),
    StructField('temperature', FloatType(), True)
])

df = spark.read.schema(schema).csv('1800.csv')

df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



### 필터링

1. 데이터프레임의 `filter` 메소드
2. `where` 메소드 -> **Column Expression** 방식 / **SQL Expression** 방식

In [7]:
a = df.filter(df.measure_type == 'TMIN')
a.count()

730

In [8]:
b = df.where(df.measure_type == 'TMIN')
b.count()

730

In [9]:
c = df.where('measure_type = "TMIN"')
c.count()

730

### 특정 컬럼만 선택하기

1. 리스트로 컬럼명을 지정하기
2. `select` 메소드를 사용하기

In [10]:
df_tmin = df.where(df.measure_type == 'TMIN')

df_tmin[['stationID', 'temperature']].show(5)

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
+-----------+-----------+
only showing top 5 rows



In [11]:
df_tmin.select('stationID', 'temperature').show(5)

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
+-----------+-----------+
only showing top 5 rows



### 그룹화

In [13]:
df_tmin.groupby('stationID').min('temperature').show()

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [17]:
df_tmin.groupBy('stationID').min('temperature').show()

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [18]:
result = df_tmin.groupby('stationID').min('temperature')

results = result.collect()
for i in results:
    print(i[0], i[1])

ITE00100554 -148.0
EZE00100082 -135.0


In [19]:
results

[Row(stationID='ITE00100554', min(temperature)=-148.0),
 Row(stationID='EZE00100082', min(temperature)=-135.0)]

## Spark SQL로 처리하기

In [83]:
df.createOrReplaceTempView('view1800')

results = spark.sql(
    """
    SELECT stationID, MIN(temperature) AS min
    FROM view1800
    WHERE measure_type = 'TMIN'
    GROUP BY 1
    """
).collect()

In [87]:
for i in results:
    print(i[0], i[1])

ITE00100554 -148.0
EZE00100082 -135.0
