In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("learn-s").getOrCreate()

In [4]:
stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]

In [8]:
stockSchema = ["name", "ticker", "country", "price", "currency"]

In [9]:
df = spark.createDataFrame(data=stocks, schema=stockSchema)

In [10]:
df.dtypes

[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]

In [11]:
df.show()

+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+



In [12]:
df.createOrReplaceTempView("stocks")

In [13]:
spark.sql("select name from stocks").show()

+-------+
|   name|
+-------+
| Google|
|Netflix|
| Amazon|
|  Tesla|
|Tencent|
| Toyota|
|Samsung|
|  Kakao|
+-------+



In [17]:
spark.sql("select name, price from stocks").show()

+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
|Netflix|   645|
| Amazon|  3518|
|  Tesla|  1222|
|Tencent|   483|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [18]:
spark.sql("select name, price from stocks where country = 'Korea'").show()

+-------+------+
|   name| price|
+-------+------+
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [19]:
spark.sql("select name, price from stocks where price > 2000").show()

+-------+------+
|   name| price|
+-------+------+
| Google|  2984|
| Amazon|  3518|
| Toyota|  2006|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+



In [20]:
spark.sql("select name, price from stocks where price > 2000 and country = 'USA'").show()

+------+-----+
|  name|price|
+------+-----+
|Google| 2984|
|Amazon| 3518|
+------+-----+



In [22]:
spark.sql("select name, price from stocks where country LIKE 'U%'").show()

+-------+-----+
|   name|price|
+-------+-----+
| Google| 2984|
|Netflix|  645|
| Amazon| 3518|
|  Tesla| 1222|
+-------+-----+



In [25]:
spark.sql("select country, avg(price) from stocks group by country").createOrReplaceTempView("stocks_avg_fat")

In [27]:
spark.sql("select * from stocks_avg_fat").show()

+---------+----------+
|  country|avg(price)|
+---------+----------+
|Hong Kong|     483.0|
|      USA|   2092.25|
|    Japan|    2006.0|
|    Korea|   97800.0|
+---------+----------+



In [28]:
earnings = [
    ('Google', 27.99, 'USD'), 
    ('Netflix', 2.56, 'USD'),
    ('Amazon', 6.12, 'USD'),
    ('Tesla', 1.86, 'USD'),
    ('Tencent', 11.01, 'HKD'),
    ('Toyota', 224.82, 'JPY'),
    ('Samsung', 1780., 'KRW'),
    ('Kakao', 705., 'KRW')
]

In [29]:
from pyspark.sql.types import StringType, FloatType, StructType, StructField

In [30]:
earningsSchema = StructType([
    StructField("name", StringType(), True),
    StructField("eps", FloatType(), True),
    StructField("currency", StringType(), True),
])

In [31]:
earningsDF = spark.createDataFrame(data=earnings, schema=earningsSchema)

In [32]:
earningsDF.dtypes

[('name', 'string'), ('eps', 'float'), ('currency', 'string')]

In [33]:
earningsDF.createOrReplaceTempView("earnings")

In [34]:
earningsDF.select("*").show()

+-------+------+--------+
|   name|   eps|currency|
+-------+------+--------+
| Google| 27.99|     USD|
|Netflix|  2.56|     USD|
| Amazon|  6.12|     USD|
|  Tesla|  1.86|     USD|
|Tencent| 11.01|     HKD|
| Toyota|224.82|     JPY|
|Samsung|1780.0|     KRW|
|  Kakao| 705.0|     KRW|
+-------+------+--------+



In [35]:
spark.sql("select * from stocks join earnings on stocks.name = earnings.name").show()

+-------+------+---------+------+--------+-------+------+--------+
|   name|ticker|  country| price|currency|   name|   eps|currency|
+-------+------+---------+------+--------+-------+------+--------+
|  Kakao|035720|    Korea|125000|     KRW|  Kakao| 705.0|     KRW|
|Samsung|005930|    Korea| 70600|     KRW|Samsung|1780.0|     KRW|
|  Tesla|  TSLA|      USA|  1222|     USD|  Tesla|  1.86|     USD|
| Google| GOOGL|      USA|  2984|     USD| Google| 27.99|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|Tencent| 11.01|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY| Toyota|224.82|     JPY|
|Netflix|  NFLX|      USA|   645|     USD|Netflix|  2.56|     USD|
| Amazon|  AMZN|      USA|  3518|     USD| Amazon|  6.12|     USD|
+-------+------+---------+------+--------+-------+------+--------+



In [36]:
# PER: Price / EPS
spark.sql("select stocks.name, (price / eps) as per \
  FROM stocks JOIN earnings \
  ON stocks.name = earnings.name").show()

+-------+------------------+
|   name|               per|
+-------+------------------+
|  Kakao| 177.3049645390071|
|Samsung|39.662921348314605|
|  Tesla|  656.989242258975|
| Google| 106.6095042658442|
|Tencent| 43.86920889728746|
| Toyota| 8.922693419839167|
|Netflix| 251.9531306315913|
| Amazon| 574.8366120563447|
+-------+------------------+



In [40]:
earningsDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- eps: float (nullable = true)
 |-- currency: string (nullable = true)



In [45]:
earningsDF.agg({"eps": "mean"}).collect()

[Row(avg(eps)=344.9200008958578)]

In [47]:
from pyspark.sql import functions as F
earningsDF.agg(F.min(earningsDF.eps)).collect()

[Row(min(eps)=1.8600000143051147)]

In [48]:
earningsDF.groupBy().avg().collect()

[Row(avg(eps)=344.9200008958578)]

In [49]:
earningsDF.groupBy(earningsDF.name).avg().collect()

[Row(name='Kakao', avg(eps)=705.0),
 Row(name='Samsung', avg(eps)=1780.0),
 Row(name='Tesla', avg(eps)=1.8600000143051147),
 Row(name='Google', avg(eps)=27.989999771118164),
 Row(name='Tencent', avg(eps)=11.010000228881836),
 Row(name='Toyota', avg(eps)=224.82000732421875),
 Row(name='Netflix', avg(eps)=2.559999942779541),
 Row(name='Amazon', avg(eps)=6.119999885559082)]

In [53]:
friends = [
    ("jaeho", 39, "M"),
    ("inchul", 38, "M"),
    ("joohyun", 40, "F"),
    ("hyungjin", 38, "M"),
    ("sam", 35, "F")
]


In [62]:
from pyspark.sql.types import StringType, FloatType, StructType, StructField, IntegerType
friendsSchema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("sex", StringType(), True)
])

friendsDF = spark.createDataFrame(data=friends, schema=friendsSchema)
friendsDF.groupBy().avg().collect()

[Row(avg(age)=38.0)]

In [61]:
sorted(friendsDF.groupBy('name').agg({'age': 'mean'}).collect())

[Row(name='hyungjin', avg(age)=38.0),
 Row(name='inchul', avg(age)=38.0),
 Row(name='jaeho', avg(age)=39.0),
 Row(name='joohyun', avg(age)=40.0),
 Row(name='sam', avg(age)=35.0)]

In [66]:
sorted(friendsDF.groupBy(friendsDF.name).avg().collect())

[Row(name='hyungjin', avg(age)=38.0),
 Row(name='inchul', avg(age)=38.0),
 Row(name='jaeho', avg(age)=39.0),
 Row(name='joohyun', avg(age)=40.0),
 Row(name='sam', avg(age)=35.0)]

In [68]:
sorted(friendsDF.groupBy(['name', friendsDF.age]).count().collect())

[Row(name='hyungjin', age=38, count=1),
 Row(name='inchul', age=38, count=1),
 Row(name='jaeho', age=39, count=1),
 Row(name='joohyun', age=40, count=1),
 Row(name='sam', age=35, count=1)]