In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

import findspark
findspark.init()


from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("my_first_pyspark")\
        .config('spark.ui.port','4050')\
        .getOrCreate()

sc = spark.sparkContext

In [None]:
stringJSONRDD = sc.parallelize(("""
{ "id" : "123", "name": "katie","age":19}""",
"""{ "id" : "233", "name": "bbbkatie","age":190,"class" :["fsd","aaaaaa"]}""")
)

In [None]:
print(type(stringJSONRDD))
print(stringJSONRDD.collect())

<class 'pyspark.rdd.RDD'>
['\n{ "id" : "123", "name": "katie","age":19}', '{ "id" : "233", "name": "bbbkatie","age":190,"class" :["fsd","aaaaaa"]}']


In [None]:
#RDD를 생성했으니 이것을 spark.read.json 함수를 사용해서 rdd로 바꾼다

swimmersJSON = spark.read.json(stringJSONRDD)

In [None]:
#create temporary table
# dataframe 을 sql 문으로 조작하기 위해 view를 만들어주는 함수 
swimmersJSON.createOrReplaceTempView("s") # "테이블 이름" 

In [None]:
print(type(swimmersJSON))
print(swimmersJSON.collect())
swimmersJSON.show() # 괄호안에 갯수 데이터 갯수 넣으면됨.  

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(age=19, class=None, id='123', name='katie'), Row(age=190, class=['fsd', 'aaaaaa'], id='233', name='bbbkatie')]
+---+-------------+---+--------+
|age|        class| id|    name|
+---+-------------+---+--------+
| 19|         null|123|   katie|
|190|[fsd, aaaaaa]|233|bbbkatie|
+---+-------------+---+--------+



In [None]:
swimmersJSON.show(1)

+---+-----+---+-----+
|age|class| id| name|
+---+-----+---+-----+
| 19| null|123|katie|
+---+-----+---+-----+
only showing top 1 row



In [None]:
#sql 문으로 조작하는 예시 
spark.sql("select * frOM s").show()

+---+-------------+---+--------+
|age|        class| id|    name|
+---+-------------+---+--------+
| 19|         null|123|   katie|
|190|[fsd, aaaaaa]|233|bbbkatie|
+---+-------------+---+--------+



In [None]:
#스키마 출력 확인

swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- class: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [None]:
#DataFrame을 생성할 때 schema를 직접 명시하고 생성 가능

from pyspark.sql.types import *

#Genereate our own CSV data
stringCSVRDD = sc.parallelize(
    [(123,'katie',19,'brown'),
     (156,'baaaie',122,'greem'),
     (223,'sdfe',1933,'blue')])

#the schema is encoded in a string,
#we define the schema using various pyspark.sql.types

schema = StructType([
    StructField("id", LongType(),False),
    StructField("name", StringType(),True),
    StructField("age", LongType(),True),
    StructField("eyecolor", StringType(),True)    
])

#apply the schema to the rdd and create dataframe
wooseok = spark.createDataFrame(stringCSVRDD,schema)

#create a temporary view using the dataframe
wooseok.createOrReplaceTempView("NOUI")

wooseok.show()

wooseok.printSchema()

+---+------+----+--------+
| id|  name| age|eyecolor|
+---+------+----+--------+
|123| katie|  19|   brown|
|156|baaaie| 122|   greem|
|223|  sdfe|1933|    blue|
+---+------+----+--------+

root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyecolor: string (nullable = true)



In [None]:
# 외부 파일 읽어서 dataframe 생성하기 실습

df = spark.read.format("json").load("/content/drive/MyDrive/Colab Notebooks/데이터분석을위한프로그래밍/2015-summary.json")
df.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [None]:
df.printSchema() # 자동으로 schema를 어느정도 지정해준다 

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [None]:
# load할 때 schema를 정의해서 불러오기 
Myschema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(),False),
    StructField("ORIGIN_COUNTRY_NAME", StringType(),True),
    StructField("count", LongType(),False)
])

df = spark.read.format("json").schema(Myschema).load("/content/drive/MyDrive/Colab Notebooks/데이터분석을위한프로그래밍/2015-summary.json")
df.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [None]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [None]:
#Selections

wooseok.collect()

[Row(id=123, name='katie', age=19, eyecolor='brown'),
 Row(id=156, name='baaaie', age=122, eyecolor='greem'),
 Row(id=223, name='sdfe', age=1933, eyecolor='blue')]

In [None]:
wooseok.take(1)

[Row(id=123, name='katie', age=19, eyecolor='brown')]

In [None]:
wooseok.select("id","age").show(2)

+---+---+
| id|age|
+---+---+
|123| 19|
|156|122|
+---+---+
only showing top 2 rows



In [None]:
from pyspark.sql.functions import *
wooseok.select(expr("id AS ID"),expr("age AS AGE")).show(2)

+---+---+
| ID|AGE|
+---+---+
|123| 19|
|156|122|
+---+---+
only showing top 2 rows



In [None]:
wooseok.selectExpr("id AS ID").show(2)

+---+
| ID|
+---+
|123|
|156|
+---+
only showing top 2 rows



In [None]:
#selectExpr() 을 사용하여 출발지와 도착지가 같은지를 나타내는 새로운 column을 추가하여 보여주는 예제
df.show(3)

df.selectExpr(
    "*", # all original columns
    "(DEST_COUNTRY_NAME =ORIGIN_COUNTRY_NAME) as withinCOUNTRY").show(5)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCOUNTRY|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
|    United States|              India|   62|        false|
+-----------------+-------------------+-----+-------------+
only showing top 5 rows



In [None]:
# aggrete functions :avg,min,max,sum,count

# count 값의 평균과 출발지 컬럼에서 중복값을 제거한 다음 그 개수를 구한 예제

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [None]:
#lit

# literl 의 준말. 명시적인 값을 스파크에 전달하기 위해 사용 

df.select(expr("*"),lit(1456).alias("One")).show(2)

+-----------------+-------------------+-----+----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| One|
+-----------------+-------------------+-----+----+
|    United States|            Romania|   15|1456|
|    United States|            Croatia|    1|1456|
+-----------------+-------------------+-----+----+
only showing top 2 rows



In [None]:
#새로운 컬럼 푸가 하여 리턴

df2 = df.withColumn("dsfadsf",lit(21))
df2.show(2)

+-----------------+-------------------+-----+-------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|dsfadsf|
+-----------------+-------------------+-----+-------+
|    United States|            Romania|   15|     21|
|    United States|            Croatia|    1|     21|
+-----------------+-------------------+-----+-------+
only showing top 2 rows



In [None]:
# 출발지와 도착지가같은지 여부를 표현하고 이를 새로운 column으로 추가하는 예제 

df2 =df.withColumn("wefaewffds",expr("DEST_COUNTRY_NAME =ORIGIN_COUNTRY_NAME"))
df2.show(2)

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|wefaewffds|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|     false|
|    United States|            Croatia|    1|     false|
+-----------------+-------------------+-----+----------+
only showing top 2 rows



In [None]:
df.filter(col("count")<2).show(2)
df.where("count<2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().show(5)

+-------------------+
|ORIGIN_COUNTRY_NAME|
+-------------------+
|           Paraguay|
|             Russia|
|           Anguilla|
|            Senegal|
|             Sweden|
+-------------------+
only showing top 5 rows



In [None]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").distinct().count()

256

In [None]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").distinct().show(5)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|          Croatia|      United States|
|           Kosovo|      United States|
|          Romania|      United States|
|          Ireland|      United States|
|    United States|              Egypt|
+-----------------+-------------------+
only showing top 5 rows



In [None]:
# 무작위 샘플 만들기

seed =10
withReplacement = False
fraction =0.5

print(df.sample(withReplacement, fraction, seed).count())
df.sample(withReplacement, fraction, seed).show()


117
+--------------------+--------------------+-----+
|   DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+--------------------+--------------------+-----+
|       United States|             Romania|   15|
|       United States|               India|   62|
|       United States|           Singapore|    1|
|             Moldova|       United States|    1|
|       United States|    Marshall Islands|   39|
|              Guyana|       United States|   64|
|            Anguilla|       United States|   41|
|Saint Vincent and...|       United States|    1|
|       United States|Federated States ...|   69|
|       United States|         Netherlands|  660|
|             Iceland|       United States|  181|
|          Luxembourg|       United States|  155|
|         The Bahamas|       United States|  955|
|       United States|              Angola|   13|
|            Suriname|       United States|    1|
|              Mexico|       United States| 7140|
|       United States|              Cyprus|   

In [None]:
!nproc

2


In [None]:
person = spark.createDataFrame([
    (0, "Bill Chambers",0,[100]),
    (1, "mad zambers",1,[500,250,100]),
    (2, "kill song",1,[250,100])
]).toDF("id","name","graducate_program","spark_status")

graduateProgram = spark.createDataFrame([
    (0, "masters","school of information","UC Berkeley"),
    (2, "masters","ECCS","UC Berkeley"),
    (1, "PH.D","ECCS","UC Berkeley")
]).toDF("id","degreee","department","school")

sparkStatus = spark.createDataFrame([
    (500, "ViCE president"),
    (250, "PMC memver"),
    (100, "contributor")
]).toDF("id","status")


In [None]:
person.show()
graduateProgram.show()
sparkStatus.show()

+---+-------------+-----------------+---------------+
| id|         name|graducate_program|   spark_status|
+---+-------------+-----------------+---------------+
|  0|Bill Chambers|                0|          [100]|
|  1|  mad zambers|                1|[500, 250, 100]|
|  2|    kill song|                1|     [250, 100]|
+---+-------------+-----------------+---------------+

+---+-------+--------------------+-----------+
| id|degreee|          department|     school|
+---+-------+--------------------+-----------+
|  0|masters|school of informa...|UC Berkeley|
|  2|masters|                ECCS|UC Berkeley|
|  1|   PH.D|                ECCS|UC Berkeley|
+---+-------+--------------------+-----------+

+---+--------------+
| id|        status|
+---+--------------+
|500|ViCE president|
|250|    PMC memver|
|100|   contributor|
+---+--------------+



In [None]:
#Join

# Inner Join 
# 방법 1 
joinExpression = person["graducate_program"]==graduateProgram["id"]


In [None]:
joinType = "inner"
person.join(graduateProgram, joinExpression,joinType).show()

+---+-------------+-----------------+---------------+---+-------+--------------------+-----------+
| id|         name|graducate_program|   spark_status| id|degreee|          department|     school|
+---+-------------+-----------------+---------------+---+-------+--------------------+-----------+
|  0|Bill Chambers|                0|          [100]|  0|masters|school of informa...|UC Berkeley|
|  1|  mad zambers|                1|[500, 250, 100]|  1|   PH.D|                ECCS|UC Berkeley|
|  2|    kill song|                1|     [250, 100]|  1|   PH.D|                ECCS|UC Berkeley|
+---+-------------+-----------------+---------------+---+-------+--------------------+-----------+



In [None]:
joinType = "outer"
person.join(graduateProgram, joinExpression,joinType).show()

+----+-------------+-----------------+---------------+---+-------+--------------------+-----------+
|  id|         name|graducate_program|   spark_status| id|degreee|          department|     school|
+----+-------------+-----------------+---------------+---+-------+--------------------+-----------+
|   0|Bill Chambers|                0|          [100]|  0|masters|school of informa...|UC Berkeley|
|   1|  mad zambers|                1|[500, 250, 100]|  1|   PH.D|                ECCS|UC Berkeley|
|   2|    kill song|                1|     [250, 100]|  1|   PH.D|                ECCS|UC Berkeley|
|null|         null|             null|           null|  2|masters|                ECCS|UC Berkeley|
+----+-------------+-----------------+---------------+---+-------+--------------------+-----------+

