In [3]:
!pip install pyspark



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m


In [4]:
# hostname 경고 해결
# WARN Utils: Your hostname, ... resolves to a loopback address: 127.0.0.1; using 192.168.100.101 instead

import os
os.environ["SPARK_LOCAL_IP"] = "192.168.100.101"


## createDataFrame
```
SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
```

In [5]:
from pyspark.sql import (Row, SparkSession)
from pyspark.sql.functions import col, asc, desc

def parse_line(line:str):
    fields = line.split("|")
    return Row(
        name = str(fields[0]),
        country = str(fields[1]),
        email = str(fields[2]),
        compensation = int(fields[3])
    )

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# map사용을 위해 sparkContext.textFile을 사용하여 RDD로 반환
lines = spark.sparkContext.textFile("file:////Users/jeongmieun/study/spark/sample/income.txt")
# map: RDD의 각 요소마다 주어진 함수를 적용
income_date = lines.map(parse_line)

# SparkSession.createDateFrame(data, schema=None, samplingRatio=None, verifySchema=True)
schema_income = spark.createDataFrame(data=income_date).cache() # cache에 저장

# Creates or replaces a local temporary view with this DataFrame
schema_income.createOrReplaceTempView("income")
# schema_income.show()

# avg_income = spark.sql(
    # "SELECT * FROM income WHERE compensation >= (SELECT avg(compensation) FROM income)")
# avg_income.show()

# collect: 클러스터에 분산되어 있는 데이터(RDD나 DataFrame)의 모든 데이터를 드라이버 노드로 가져와서 파이썬 리스트 형태로 반환
# for data in avg_income.collect():
    # print(data)
    # print(data.name)

# use function
schema_income.groupBy("country").count().orderBy(col("count").desc()).show()

25/02/23 16:28:10 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------------------+-----+
|             country|count|
+--------------------+-----+
|           Australia|   10|
|           Singapore|    9|
|             Ecuador|    9|
|            Dominica|    9|
|          Madagascar|    9|
|           Nicaragua|    9|
|              Kuwait|    9|
|               Congo|    9|
|            Thailand|    9|
|             Senegal|    8|
|Sao Tome and Prin...|    8|
|Virgin Islands, B...|    8|
|              Zambia|    8|
|  Dominican Republic|    8|
|                Mali|    8|
|             Belgium|    7|
|Palestinian Terri...|    7|
|             Lesotho|    7|
|         Isle of Man|    7|
|             Bolivia|    7|
+--------------------+-----+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import avg, col, round as r

spark = SparkSession.builder.appName("sql_import_csv").getOrCreate()
csv_file_path = "file:///Users/jeongmieun/study/spark/sample/age.csv"

# header option: either csv has header or note(default: header = false)
# inferSchema: either all columns are str or not(guess type)
data = spark.read.option("header","true").option("inferSchema","true").csv(csv_file_path)

data.printSchema()

25/02/23 16:28:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- country: string (nullable = true)



In [7]:
# show column name with data
# data.select("name","age").show()

# filter the data for age of 20 above
# data.filter(data.age>20).show()

# group by age and aggregates for count
# data.groupBy("age").count().show()

# custom arithmetic
# data.select(data.name, data.age, data.age-10).show()

# column alias
# data.select(data.name, col("age").alias("new_age")).show()

# average
# data.select(data.name, data.age, data.country).groupBy("country").avg("age").show()

# average and sort
# data.select(data.name, data.age, data.country).groupBy("country").avg("age").sort("avg(age)").show()

# average and round
# agg: groupBy로 그룹을 나눈 후 각 그룹에 대해 집계(aggregation) 연산을 수행하기 위해 사용
data.select(data.name, data.age, data.country).groupBy("country").agg(r(avg("age"),2).alias("avg_age")).show()

+--------------------+-------+
|             country|avg_age|
+--------------------+-------+
|                Chad|  36.25|
|            Paraguay|  47.78|
|            Anguilla|   72.0|
|               Macao|   72.0|
|Heard Island and ...|   30.0|
|             Senegal|   53.0|
|              Sweden|  45.33|
|             Tokelau|  34.17|
|French Southern T...|  50.67|
|            Kiribati|  48.67|
|   Republic of Korea|  58.17|
|              Guyana|   39.0|
|             Eritrea|  39.75|
|              Jersey|   58.8|
|         Philippines|  48.33|
|            Djibouti|   38.6|
|               Tonga|   49.0|
|      Norfolk Island|  35.33|
|            Malaysia|  60.67|
|           Singapore|   40.0|
+--------------------+-------+
only showing top 20 rows



## functions.explode(col)

In [8]:
from pyspark.sql import functions

spark = SparkSession.builder.appName("df_wordcount").getOrCreate()

# functions.explode(col)
# Returns a new row for each element in the given array or map
df = spark.createDataFrame([
    Row(a=1, intlist=[1,2,3],
        mapfield={"a":"b"}
       )])
df.select(functions.explode(df.intlist).alias("anInt")).collect()
      

25/02/23 16:28:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Row(anInt=1), Row(anInt=2), Row(anInt=3)]

## functions.split(str, pattern, limit=-1)

In [9]:
# functions.split(str, pattern, limit=-1) limit된 숫자의 크기로 나눔
# Splits str around matches of the given pattern
df = spark.createDataFrame([
    Row(word="hello world and pyspark")])
df.select(functions.split(df.word, ' ').alias("word")).collect()

[Row(word=['hello', 'world', 'and', 'pyspark'])]

In [10]:
txt_file_path = "file:///Users/jeongmieun/study/spark/sample/lorem_ipsum.txt"

# read.text로 가져오면 한 개의 value을 가진 DataFrame으로 저장
df = spark.read.text(txt_file_path)
df.show()


+--------------------+
|               value|
+--------------------+
|Lorem ipsum dolor...|
|                    |
|Orci eu lobortis ...|
|                    |
|Vulputate enim nu...|
|                    |
|Sit amet nulla fa...|
|                    |
|Nibh cras pulvina...|
|                    |
|Arcu felis bibend...|
|                    |
|Vestibulum sed ar...|
|                    |
|Sit amet tellus c...|
|                    |
|Augue mauris augu...|
|                    |
|Pellentesque mass...|
|                    |
+--------------------+
only showing top 20 rows



In [11]:
words = df.select(
    functions.explode(
        functions.split(df.value, ' ')).alias("word"))

words.show()

+-----------+
|       word|
+-----------+
|      Lorem|
|      ipsum|
|      dolor|
|        sit|
|      amet,|
|consectetur|
| adipiscing|
|      elit,|
|        sed|
|         do|
|    eiusmod|
|     tempor|
| incididunt|
|         ut|
|     labore|
|         et|
|     dolore|
|      magna|
|    aliqua.|
|         Et|
+-----------+
only showing top 20 rows



In [12]:
word_counts = words.groupBy("word").count().orderBy(functions.col("count").desc())
word_counts.show()

+------------+-----+
|        word|count|
+------------+-----+
|         sed|  194|
|          in|  164|
|        amet|  149|
|         sit|  147|
|          ut|  140|
|        eget|  131|
|          id|  127|
|          at|  120|
|       vitae|  118|
|          et|  117|
|        nunc|  113|
|          eu|  108|
|         non|  102|
|            |   99|
|          ac|   97|
|      tellus|   97|
|        diam|   95|
|     viverra|   95|
|        enim|   93|
|pellentesque|   93|
+------------+-----+
only showing top 20 rows



## types.StructField(name, dataType, nullable = True, metadata=None)

In [13]:
from pyspark.sql import(
functions as f, 
Row,
SparkSession, 
types as t
)

spark = SparkSession.builder.appName("df_struct").getOrCreate()

# csv 파일에 header가 없을 때 hearder를 먼저 생성
# types.StructField(name, dataType, nullable = True, metadata=None)
table_schema = t.StructType([
    t.StructField("country", t.StringType(), True),
    t.StructField("temperature", t. FloatType(), True),
    t.StructField("observed_date", t.StringType(), True)])

csv_file_path = "file:///Users/jeongmieun/study/spark/sample/temp_with_date.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- temperature: float (nullable = true)
 |-- observed_date: string (nullable = true)



25/02/23 16:28:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [18]:
data = df.select("country", "temperature", "observed_date")
min_temperature = data.groupBy("country").min("temperature")

min_temperature.show()

+--------------------+----------------+
|             country|min(temperature)|
+--------------------+----------------+
|                Chad|           -24.0|
|            Anguilla|           -40.0|
|            Paraguay|            30.0|
|               Macao|           -34.0|
|Heard Island and ...|           -39.0|
|               Yemen|           -33.0|
|             Senegal|           -21.0|
|              Sweden|           -29.0|
|             Tokelau|           -35.0|
|            Kiribati|           -26.0|
|French Southern T...|           -22.0|
|   Republic of Korea|           -18.0|
|              Guyana|           -28.0|
|             Eritrea|           -40.0|
|         Philippines|           -34.0|
|              Jersey|           -21.0|
|      Norfolk Island|           -28.0|
|               Tonga|           -40.0|
|           Singapore|           -25.0|
|            Malaysia|           -21.0|
+--------------------+----------------+
only showing top 20 rows



In [31]:
# celsius to fahrenheit: (0°C × 9/5) + 32 
f_temp = data.withColumn(
    "temperature",
    functions.round(f.col("temperature") * 9 / 5) + 32).select("country", "temperature")

f_temp.show()

+--------------------+-----------+
|             country|temperature|
+--------------------+-----------+
|                Guam|      -13.0|
|                Guam|      102.0|
|              Serbia|      -31.0|
|       French Guiana|       21.0|
|Falkland Islands ...|      -40.0|
|              Brazil|       59.0|
|             Tunisia|      -24.0|
|            Portugal|       45.0|
|                Iran|       -8.0|
|           Australia|       23.0|
|              Gambia|       70.0|
|               Italy|       88.0|
|          Guadeloupe|      -38.0|
|        South Africa|      -11.0|
|              Malawi|       21.0|
|                Iran|       93.0|
|      Norfolk Island|       23.0|
|Lao People's Demo...|       77.0|
|   Republic of Korea|        0.0|
|           Singapore|        5.0|
+--------------------+-----------+
only showing top 20 rows



In [4]:
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("df_total").getOrCreate()
table_schema = t.StructType([
    t.StructField("customer_name", t.StringType(), True),
    t.StructField("product_id", t.IntegerType(), True),
    t.StructField("price", t.IntegerType(), True)])

csv_file_path = "file:///Users/jeongmieun/study/spark/sample/product.csv"

df = spark.read.schema(table_schema).csv(csv_file_path)
customer_spent = df.groupBy("customer_name").agg(f.round(f.sum("price"),2).alias("cost"))
# customer_spent.show()

sorted_customer_spent = customer_spent.orderBy(f.col("cost").desc())
sorted_customer_spent.show()


+-----------------+----+
|    customer_name|cost|
+-----------------+----+
|     Damion Wolfe|1397|
| Benedict Frazier| 998|
|  Giuseppe Miller| 997|
|    Garret Martin| 997|
|Erminia Robertson| 997|
|     Milan Gibson| 996|
|     Rudy Wheeler| 994|
|   Kathey Baldwin| 994|
|   Williemae Bell| 992|
|Gearldine Aguilar| 988|
|      Jewel Parks| 987|
|     Hyman Castro| 985|
|    Noriko Medina| 984|
|     Garfield Day| 982|
|      Dacia Adams| 981|
|     Taisha Henry| 980|
|    Branda Valdez| 978|
|     Fumiko Weber| 976|
|Geraldo Alexander| 975|
|      Walker Pope| 975|
+-----------------+----+
only showing top 20 rows

