## SparkSession 객체 생성 : Spark 기동과 초기화

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]") \
                    .appName('sparkedu') \
                    .getOrCreate()
spark

### Spark을 사용하여 hotel.txt파일의 내용으로 텍스트 마이닝을 해보자~
#### hotel.txt 파일에서 가장 많이 등장한 명사들을 많이 등장한 순으로 30개를 출력해 본다. 

In [4]:
hoteldf = spark.read.text("data/hotel.txt") 
hoteldf.show(3)

+-----------------------------------+
|                              value|
+-----------------------------------+
|대중교통을 이용한다면 비추입니다...|
|  위치는 좋았는데, 가격대비 방이...|
|           만족하고 나왔습니다 ㅎㅎ|
+-----------------------------------+
only showing top 3 rows



In [5]:
print(hoteldf.take(3))

[Row(value='대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도 만족해요'), Row(value='위치는 좋았는데, 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지, 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요'), Row(value='만족하고 나왔습니다 ㅎㅎ')]


In [6]:
imsi = hoteldf.collect()
print(imsi[:3])

[Row(value='대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도 만족해요'), Row(value='위치는 좋았는데, 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지, 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요'), Row(value='만족하고 나왔습니다 ㅎㅎ')]


In [7]:
hotellist = [ x[0] for x in imsi ]
print(hotellist[:3])

['대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도 만족해요', '위치는 좋았는데, 가격대비 방이 너무 작았던것 같아요 ㅜ 청결이라던지, 직원들 응대는 좋았지만 강남에서 이가격이면 그냥 쏘쏘한것 같긴 하네요', '만족하고 나왔습니다 ㅎㅎ']


In [8]:
hotelstr = ' '.join(hotellist)
print(hotelstr[:50])

대중교통을 이용한다면 비추입니다. 역과 애매한 거리입니다. 시설은 너무 좋았어요 어메니티도


In [None]:
import re

hangul = re.compile('[^ ㄱ-ㅣ가-힣]+') 
hangul.sub('', hotelstr) # 한글과 띄어쓰기를 제외한 모든 부분을 제거

In [10]:
from konlpy.tag import Okt                                  ## 다른 형태소를 클래스를 가져온다. 
okt = Okt()
hotelnoun = okt.nouns(hotelstr)
print(hotelnoun[:50])

['대중교통', '이용', '면', '역', '거리', '시설', '어메니티', '위치', '가격', '대비', '방이', '청결', '직원', '응대', '강남', '가격', '그냥', '것', '위치', '개', '역', '중간', '즈음', '짐', '날씨', '함', '화장실', '하수', '냄새', '후기', '약시', '냄새', '안나', '방', '요청', '체크', '인시', '더블체크', '함', '막상', '사용', '냄새', '화장실', '쪽', '자꾸', '소리', '남', '방', '환풍기', '소리']


In [11]:
hotelRDD = spark.sparkContext.parallelize(hotelnoun)

In [12]:
wc = hotelRDD.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(lambda x1, x2: x1 + x2) \
                  .map(lambda x: (x[1], x[0])) \
                  .sortByKey(ascending=False).collect()

for (count, word) in wc[:30]:
    print("{} : {}".format(word, count))
    

위치 : 149
호텔 : 138
직원 : 104
가격 : 98
이용 : 81
조식 : 74
신라 : 67
시설 : 66
방 : 62
좀 : 61
룸 : 61
것 : 59
대비 : 50
스테이 : 49
조금 : 46
생각 : 46
상태 : 45
다만 : 42
때 : 41
곳 : 41
객실 : 40
숙소 : 39
소리 : 37
출장 : 37
층 : 37
냄새 : 34
더 : 34
청결 : 33
청소 : 33
침구 : 31


### Spark를 활용하여 product_click_new.log 파일로 날짜 데이터에 대한 전처리를 처리해보자~

In [13]:
click = spark.read.csv("data/product_click_new.log", sep=" ", inferSchema=True)

In [14]:
click.show(5)

+------------+----+
|         _c0| _c1|
+------------+----+
|201612120944|p001|
|201612120944|p003|
|201612120944|p003|
|201612120945|p008|
|201612121052|p008|
+------------+----+
only showing top 5 rows



In [24]:
click = click.withColumnRenamed("_c0", "clicktime")\
       .withColumnRenamed("_c1", "pid")
click.show(10)

+------------+----+----+-----+---+----+------+
|   clicktime| pid|year|month|day|hour|minute|
+------------+----+----+-----+---+----+------+
|201612120944|p001|2016|   12| 12|   9|    44|
|201612120944|p003|2016|   12| 12|   9|    44|
|201612120944|p003|2016|   12| 12|   9|    44|
|201612120945|p008|2016|   12| 12|   9|    45|
|201612121052|p008|2016|   12| 12|  10|    52|
|201612121052|p006|2016|   12| 12|  10|    52|
|201612121052|p010|2016|   12| 12|  10|    52|
|201612121115|p002|2016|   12| 12|  11|    15|
|201612121116|p009|2016|   12| 12|  11|    16|
|201612120944|p001|2016|   12| 12|   9|    44|
+------------+----+----+-----+---+----+------+
only showing top 10 rows



In [16]:
click.printSchema()

root
 |-- clicktime: long (nullable = true)
 |-- pid: string (nullable = true)



In [17]:
from pyspark.sql.functions import col
from pyspark.sql.functions import StringType
click = click.withColumn("clicktime",col("clicktime").cast(StringType()))

In [18]:
click.printSchema()

root
 |-- clicktime: string (nullable = true)
 |-- pid: string (nullable = true)



In [19]:
import pyspark.sql.functions as f
click = click.withColumn('year',f.year(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("month",f.month(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("day",f.dayofmonth(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("hour",f.hour(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))
click = click.withColumn("minute",f.minute(f.to_timestamp('clicktime', 'yyyyMMddhhmm')))

In [23]:
click.show(10)

+------------+----+----+-----+---+----+------+
|   clicktime| pid|year|month|day|hour|minute|
+------------+----+----+-----+---+----+------+
|201612120944|p001|2016|   12| 12|   9|    44|
|201612120944|p003|2016|   12| 12|   9|    44|
|201612120944|p003|2016|   12| 12|   9|    44|
|201612120945|p008|2016|   12| 12|   9|    45|
|201612121052|p008|2016|   12| 12|  10|    52|
|201612121052|p006|2016|   12| 12|  10|    52|
|201612121052|p010|2016|   12| 12|  10|    52|
|201612121115|p002|2016|   12| 12|  11|    15|
|201612121116|p009|2016|   12| 12|  11|    16|
|201612120944|p001|2016|   12| 12|   9|    44|
+------------+----+----+-----+---+----+------+
only showing top 10 rows



In [21]:
click.groupby("hour").count().show()

+----+-----+
|hour|count|
+----+-----+
|   9|  110|
|  10|   80|
|  11|  120|
+----+-----+



In [22]:
click.select(f.hour(f.to_timestamp(click.clicktime, 'yyyyMMddhhmm')).alias('dt')).groupby('dt').count().show()

+---+-----+
| dt|count|
+---+-----+
|  9|  110|
| 10|   80|
| 11|  120|
+---+-----+



## Spark을 사용하여 타이타닉 데이터셋의 데이터 전처리를 처리해보자~

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [30]:
titanic_df = spark.read.csv("data/train.csv",header = 'True',inferSchema='True')

In [31]:
passengers_count = titanic_df.count()

In [32]:
print(passengers_count)

891


In [33]:
titanic_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [34]:
titanic_df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [35]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [36]:
titanic_df.select("Survived","Pclass","Embarked").show()

+--------+------+--------+
|Survived|Pclass|Embarked|
+--------+------+--------+
|       0|     3|       S|
|       1|     1|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       Q|
|       0|     1|       S|
|       0|     3|       S|
|       1|     3|       S|
|       1|     2|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       S|
|       0|     3|       S|
|       1|     2|       S|
|       0|     3|       Q|
|       1|     2|       S|
|       0|     3|       S|
|       1|     3|       C|
+--------+------+--------+
only showing top 20 rows



## 자~ EDA(Exploratory Data Analysis)를 해봅시다요.

In [37]:
titanic_df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [38]:
titanic_df.groupBy("Sex","Survived").count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [39]:
titanic_df.groupBy("Pclass","Survived").count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     3|       1|  119|
|     1|       1|  136|
|     2|       1|   87|
|     2|       0|   97|
|     3|       0|  372|
+------+--------+-----+



In [40]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [41]:
null_columns_count_list = null_value_count(titanic_df)

In [42]:
null_columns_count_list

[('Age', 177), ('Cabin', 687), ('Embarked', 2)]

In [43]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                   Age|              177|
|                 Cabin|              687|
|              Embarked|                2|
+----------------------+-----------------+



In [44]:
mean_age = titanic_df.select(mean('Age')).collect()[0][0]
print(mean_age)

29.69911764705882


In [45]:
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
# index: 0은 all, 숫자는 n번째 []안에 것만 뽑아냄

In [46]:
df = spark.createDataFrame([('100-200',)], ['str'])
df.select(regexp_extract('str', r'(\d+)-(\d+)', 2).alias('d')).collect()

[Row(d='200')]

In [47]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Initial|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr|
|          6|   

In [48]:
titanic_df.select("Initial").distinct().show()

+--------+
| Initial|
+--------+
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



In [49]:
titanic_df = titanic_df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [50]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Initial|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr|
|          6|   

In [51]:
titanic_df.select("Initial").distinct().show()

+-------+
|Initial|
+-------+
|   Miss|
|  Other|
| Master|
|     Mr|
|    Mrs|
+-------+



In [52]:
titanic_df.groupby('Initial').avg('Age').collect()

[Row(Initial='Miss', avg(Age)=21.86),
 Row(Initial='Other', avg(Age)=45.888888888888886),
 Row(Initial='Master', avg(Age)=4.574166666666667),
 Row(Initial='Mr', avg(Age)=32.73960880195599),
 Row(Initial='Mrs', avg(Age)=35.981818181818184)]

In [53]:
titanic_df.filter(titanic_df.Age==46).select("Initial").show()

+-------+
|Initial|
+-------+
|     Mr|
|     Mr|
|     Mr|
+-------+



In [54]:
titanic_df.select("Age").show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|null|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|null|
|31.0|
|null|
+----+
only showing top 20 rows



In [55]:
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Miss") & (titanic_df["Age"].isNull()), 22).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Other") & (titanic_df["Age"].isNull()), 46).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Master") & (titanic_df["Age"].isNull()), 5).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mr") & (titanic_df["Age"].isNull()), 33).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mrs") & (titanic_df["Age"].isNull()), 36).otherwise(titanic_df["Age"]))

In [56]:
titanic_df.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [57]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

In [58]:
titanic_df = titanic_df.drop("Cabin")

In [59]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Initial: string (nullable = true)



In [60]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [61]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Embarked|Initial|Family_Size|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+--------+-------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|       S|     Mr|          1|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|       C|    Mrs|          1|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|       S|   Miss|          0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|       S|    Mrs|          1|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.0

In [62]:
titanic_df.groupBy("Family_Size").count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  161|
|          6|   12|
|          3|   29|
|          5|   22|
|          4|   15|
|          7|    6|
|         10|    7|
|          2|  102|
|          0|  537|
+-----------+-----+



In [None]:
titanic_df = titanic_df.withColumn('Alone',lit(0))

In [None]:
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

In [None]:
titanic_df.columns

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

In [None]:
titanic_df.show()

In [None]:
titanic_df.printSchema()

In [None]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

In [None]:
titanic_df.show()