In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
from pyspark.sql import functions as F
import string

In [4]:
#create SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

#get SparkContext by SparkSession
sc = spark.sparkContext

#Hello World
df = spark.read.csv("logstash-2022.04.27.csv",sep=',',header=False)
df2 = df.toDF("time","content")
df2.printSchema() # print table structure
df2.show()

root
 |-- time: string (nullable = true)
 |-- content: string (nullable = true)

+--------------------+--------------------+
|                time|             content|
+--------------------+--------------------+
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc

In [9]:
#SQL
df2.createTempView("log")
spark.sql("""SELECT * FROM log LIMIT 3 """).show()

+--------------------+--------------------+
|                time|             content|
+--------------------+--------------------+
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
+--------------------+--------------------+



In [10]:
#DSL
df2.where("time='2022-04-27T06:59:42.575644409+00:00'").show()

+--------------------+--------------------+
|                time|             content|
+--------------------+--------------------+
|2022-04-27T06:59:...|"pattern not matc...|
+--------------------+--------------------+



In [15]:
# RDD to DataFrame 1
rdd = sc.textFile("people.txt").\
    map(lambda x: x.split(",")).\
    map(lambda x: (x[0],int(x[1])))

#convert to DataFrame
#para1: 被轉換的RDD
#para2: 欄位名稱，透過list方式指定，按照順序提供
df = spark.createDataFrame(rdd, schema=['name','age'])

#輸出表結構
df.printSchema()

#輸出df中的數據
#para1: 輸出筆數，default=20
#para2: 是否截斷，如果數據長度超過20後續的內容是否顯示，True->截斷，False->不截斷
df.show()

#將df對象轉換成臨時視圖表，可供SQL語句查詢
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age < 30").show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
| Justin| 19|
+-------+---+



In [17]:
# RDD to DataFrame 2 (StructType)
from pyspark.sql.types import StructType, StringType, IntegerType

rdd = sc.textFile("people.txt").\
    map(lambda x: x.split(",")).\
    map(lambda x: (x[0],int(x[1])))

schema = StructType().add("name",StringType(),nullable=True).\
            add("age",IntegerType(),nullable=False)

df = spark.createDataFrame(rdd, schema=schema)

df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [20]:
# RDD to DataFrame 3 (toDF)
from pyspark.sql.types import StructType, StringType, IntegerType

rdd = sc.textFile("people.txt").\
    map(lambda x: x.split(",")).\
    map(lambda x: (x[0],int(x[1])))


#直接放List給欄位名稱這種方式類型只能靠推斷，對類型不敏感要快速創建使用
df1 = rdd.toDF(["name","age"])
df1.printSchema()
df1.show()

#透過StructType才可以指定Type & Name
schema = StructType().add("name",StringType(),nullable=True).\
            add("age",IntegerType(),nullable=False)
df2 = rdd.toDF(schema)
df2.printSchema()
df2.show()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+



In [21]:
# RDD to DataFrame 4 (基於Pandas的DataFrame)
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd

pdf = pd.DataFrame(
    {
        "id":[1,2,3],
        "name": ["Amy","Bob","Chris"],
        "age": [11, 21, 11]
    }
)

df = spark.createDataFrame(pdf)
df.printSchema()
df.show()


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|  Amy| 11|
|  2|  Bob| 21|
|  3|Chris| 11|
+---+-----+---+



In [5]:
# 統一API讀取
#create SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

#get SparkContext by SparkSession
sc = spark.sparkContext

#對於text是直接讀取整個列當作一筆資料，默認的欄位名稱是value類型是string
schema = StructType().add("data",StringType(),nullable=True)
df = spark.read.format("text").schema(schema).load("people.txt")

df.printSchema()
df.show()

root
 |-- data: string (nullable = true)

+-----------+
|       data|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+



In [7]:
#JSON自帶schema訊息
df = spark.read.format("json").load("people.json")
df.printSchema()
df.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [8]:
df = spark.read.format("csv").option("sep",",").option("header",True).option("encoding","utf-8").schema("time STRING,content STRING").load("logstash-2022.04.27.csv")
df.printSchema()
df.show()

root
 |-- time: string (nullable = true)
 |-- content: string (nullable = true)

+--------------------+--------------------+
|                time|             content|
+--------------------+--------------------+
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc...|
|2022-04-27T06:59:...|"pattern not matc

In [None]:
df = spark.read.format("parquet").load("users.parquet")
df.printSchema()
df.show()

In [17]:
df = spark.read.format("csv").schema("id INT, subject STRING, score INT").load("stu_score.txt")

# get column
id_column = df["id"]
subject_column = df["subject"]

#DSL
df.select(["id","subject"]).show()
df.select("id","subject").show()
df.select(id_column,subject_column).show()

#filter API
df.filter("score < 99").show()
df.filter(df['score'] < 99).show()

#where API
df.where("score < 99").show()
df.where(df['score'] < 99).show()

#groupby API
df.groupBy("subject").count().show()
df.groupBy(df['subject']).count().show()

# 回傳值是一個有分組關係的數據結構(GroupedData)不是DataFrame，調用聚合方法後回傳值還是DataFrame
r = df.groupBy("subject")
print(type(r))

+---+-------+
| id|subject|
+---+-------+
|  1|English|
|  2|   Math|
+---+-------+

+---+-------+
| id|subject|
+---+-------+
|  1|English|
|  2|   Math|
+---+-------+

+---+-------+
| id|subject|
+---+-------+
|  1|English|
|  2|   Math|
+---+-------+

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|English|   90|
|  2|   Math|   95|
+---+-------+-----+

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|English|   90|
|  2|   Math|   95|
+---+-------+-----+

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|English|   90|
|  2|   Math|   95|
+---+-------+-----+

+---+-------+-----+
| id|subject|score|
+---+-------+-----+
|  1|English|   90|
|  2|   Math|   95|
+---+-------+-----+

+-------+-----+
|subject|count|
+-------+-----+
|   Math|    1|
|English|    1|
+-------+-----+

+-------+-----+
|subject|count|
+-------+-----+
|   Math|    1|
|English|    1|
+-------+-----+

<class 'pyspark.sql.group.GroupedData'>


In [18]:
df = spark.read.format("csv").schema("id INT, subject STRING, score INT").load("stu_score.txt")

#SQL
df.createTempView("score")
df.createOrReplaceTempView("score2")
df.createGlobalTempView("score3") 

spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()
spark.sql("SELECT subject, COUNT(*) AS cnt FROM score2 GROUP BY subject").show()
spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score3 GROUP BY subject").show()

+-------+---+
|subject|cnt|
+-------+---+
|   Math|  1|
|English|  1|
+-------+---+

+-------+---+
|subject|cnt|
+-------+---+
|   Math|  1|
|English|  1|
+-------+---+

+-------+---+
|subject|cnt|
+-------+---+
|   Math|  1|
|English|  1|
+-------+---+



In [22]:
rdd = sc.textFile("words.txt").flatMap(lambda x: x.split(" ")).map(lambda x: [x])
df = rdd.toDF(["word"])

#SQL
df.createOrReplaceTempView("words")
spark.sql("SELECT word, count(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()

#DSL
from pyspark.sql import functions as F
df = spark.read.format("text").load("words.txt")

#withColumn 對column操作，有更新成新列保留沒有就取代掉舊的
df2 = df.withColumn("value", F.explode(F.split(df['value']," ")))
df2.groupBy("value").count().withColumnRenamed("value","word").withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).show()

+------+---+
|  word|cnt|
+------+---+
| hello|  3|
| spark|  1|
|hadoop|  1|
| flink|  1|
+------+---+

+------+---+
|  word|cnt|
+------+---+
| hello|  3|
| spark|  1|
|hadoop|  1|
| flink|  1|
+------+---+



In [36]:
schema = StructType().add("user_id",StringType(),nullable=False).\
    add("movie_id",IntegerType(),nullable=False).\
    add("rank",IntegerType(),nullable=False).\
    add("ts",StringType(),nullable=False)
df = spark.read.format("csv").option("sep","\t").option("header",False).option("encoding","utf-8").schema(schema).load("/home/cris/ml-100k/u.data")

# 用戶平均分
df.groupBy("user_id").avg("rank").withColumnRenamed("avg(rank)","avg_rank").withColumn("avg_rank",F.round("avg_rank",2)).orderBy("avg_rank",ascending=False).show()

# 電影平均分
df.createOrReplaceTempView("movie")
spark.sql("SELECT movie_id, ROUND(AVG(rank),2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC").show()

# 查詢大於平均分數電影的評分數量
print("大於平均分數電影的評分數量:", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())

# 查詢高分電影(分數>3)評分次數最多的用戶，他的平均評分
# 找到此人 first>get first row
user_id = df.where("rank > 3").groupBy("user_id").count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).limit(1).first()['user_id']

# 計算這個人的平均評分
df.filter(df['user_id'] == user_id).select(F.round(F.avg("rank"),2)).show()

# 查詢每個用戶的平均評分,最高評分,最低評分
df.groupBy("user_id").\
    agg(
        F.round(F.avg("rank"),2).alias("avg_rank"),
        F.min("rank").alias("min_rank"),
        F.max("rank").alias("max_rank")
    ).show()

# 查詢評分超過100次的電影 平均評分排名TOP10的
df.groupBy("movie_id").\
    agg(
        F.count("movie_id").alias("cnt"),
        F.round(F.avg("rank"),2).alias("avg_rank")
    ).where("cnt > 100").\
    orderBy("avg_rank",ascending=False).\
    limit(10).\
    show()

# agg: GroupedData對象的API，作用是可以在裡面寫多個聚合
# alias: Column對象的API，針對單一個欄位名稱改名
# withColumnRenamed: DataFrame對象的API，可以對DF中的欄位進行改名，一次可以改1~多
# orderBy: DataFrame對象的API，進行排序，para1:被排序的欄位，para2: 升序True 降序False
# first: DataFrame對象的API，取出DF的first row，回傳值是ROW對象
# Row對象: 就是一個數組，可以透過row['欄位名']取出欄位的具體數值

+-------+--------+
|user_id|avg_rank|
+-------+--------+
|    849|    4.87|
|    688|    4.83|
|    507|    4.72|
|    628|     4.7|
|    928|    4.69|
|    118|    4.66|
|    907|    4.57|
|    686|    4.56|
|    427|    4.55|
|    565|    4.54|
|    850|    4.53|
|    469|    4.53|
|    225|    4.52|
|    330|     4.5|
|    477|    4.46|
|    242|    4.45|
|    636|    4.45|
|    583|    4.44|
|    767|    4.43|
|    252|    4.43|
+-------+--------+
only showing top 20 rows

+--------+--------+
|movie_id|avg_rank|
+--------+--------+
|    1467|     5.0|
|     814|     5.0|
|    1653|     5.0|
|    1536|     5.0|
|    1293|     5.0|
|    1122|     5.0|
|    1201|     5.0|
|    1189|     5.0|
|    1599|     5.0|
|    1500|     5.0|
|    1449|    4.63|
|    1398|     4.5|
|    1642|     4.5|
|    1594|     4.5|
|     119|     4.5|
|     408|    4.49|
|     318|    4.47|
|     169|    4.47|
|     483|    4.46|
|     114|    4.45|
+--------+--------+
only showing top 20 rows

大於平均分數電影的評分數

In [10]:
df = spark.read.format("csv").option("sep",";").option("header",True).load("people.csv")

#去重複(對全部欄位)
df.dropDuplicates().show()

#針對特定欄位去重複
df.dropDuplicates(['age','job']).show()

#缺失值處理，有n/a值整列刪除
df.dropna().show()
df.dropna(thresh=3).show()
df.dropna(thresh=2,subset=['name','age']).show()

#將缺失值填充
df.fillna("loss").show()
df.fillna("N/A", subset=['job']).show()

#設定字典對所有欄位加入填充規則
df.fillna({"name":"未知姓名","age":1,"job":"worker"}).show()

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|Peter|  11|Developer|
|Alice|   9|     null|
|  Amy|  11|Developer|
|Alice|   9|  Manager|
|Alice|null|  Manager|
| Lily|  11|  Manager|
|jorge|  30|Developer|
|  Bob|  32|Developer|
+-----+----+---------+

+-----+----+---------+
| name| age|      job|
+-----+----+---------+
|jorge|  30|Developer|
| Lily|  11|  Manager|
|Alice|   9|  Manager|
|Alice|   9|     null|
|Alice|null|  Manager|
|  Amy|  11|Developer|
|  Bob|  32|Developer|
+-----+----+---------+

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|jorge| 30|Developer|
|  Bob| 32|Developer|
|  Amy| 11|Developer|
| Lily| 11|  Manager|
|Peter| 11|Developer|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
|Alice|  9|  Manager|
+-----+---+---------+

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|jorge| 30|Developer|
|  Bob| 32|Developer|
|  Amy| 11|Developer|
| Lily| 11|  Manager|
|Peter| 11|Developer|
|Alice|  9|  Manager|
|Alice

In [13]:
schema = StructType().add("user_id",StringType(),nullable=False).\
    add("movie_id",IntegerType(),nullable=False).\
    add("rank",IntegerType(),nullable=False).\
    add("ts",StringType(),nullable=False)
df = spark.read.format("csv").option("sep","\t").option("header",False).option("encoding","utf-8").schema(schema).load("/home/cris/ml-100k/u.data")

# write text
df.select(F.concat_ws("---","user_id","movie_id","rank","ts")).write.mode("overwrite").format("text").save("/home/cris/Documents/text")

# csv
df.write.mode("overwrite").format("csv").option("sep", ";").option("header", True).save("/home/cris/Documents/csv")

# json
df.write.mode("overwrite").format("json").save("/home/cris/Documents/json")

# parquet
df.write.mode("overwrite").format("parquet").save("/home/cris/Documents/parquet")

# jdbc, need to install driver first
df.write.mode("overwrite").\
    format("jdbc").\
    option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true").\
    option("dbtable", "movie_data").\
    option("user", "root").\
    option("password", "123456").\
    save()

spark.read.format("jdbc").\
    option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true").\
    option("dbtable", "movie_data").\
    option("user", "root").\
    option("password", "123456").\
    load()

In [17]:
rdd = sc.parallelize([1,2,3,4,5,6,7]).map(lambda x:[x])
df = rdd.toDF(["num"])

def num_ride_10(num):
    return num * 10

#1 sparksession.udf.register() DSL&SQL均可使用
#para1: 創建的udf名稱，僅可以用於SQL
#para2: 處理邏輯，是一個單獨的方法
#para3: udf的回傳值類型，真實的回傳值一定要和聲明的一致
#回傳值對象僅可以用於DSL風格
udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())

#SQL中使用 selectExpr:以SQL表達式執行
df.selectExpr("udf1(num)").show()

#DSL中使用，注意欄位名稱還是被註冊的udf1
df.select(udf2(df['num'])).show()

#2 pyspark.sql.functions.udf，僅能用於DSL風格，名稱為函數名稱
udf3 = F.udf(num_ride_10, IntegerType())
df.select(udf3(df['num'])).show()

+---------+
|udf1(num)|
+---------+
|       10|
|       20|
|       30|
|       40|
|       50|
|       60|
|       70|
+---------+

+---------+
|udf1(num)|
+---------+
|       10|
|       20|
|       30|
|       40|
|       50|
|       60|
|       70|
+---------+

+----------------+
|num_ride_10(num)|
+----------------+
|              10|
|              20|
|              30|
|              40|
|              50|
|              60|
|              70|
+----------------+



In [22]:
rdd = sc.parallelize([["hadoop spark flink"],["hadoop flink java"]])
df = rdd.toDF(["line"])

def split_line(data):
    return data.split(" ") #回傳值是Array

udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))
#DSL風格
df.select(udf2(df['line'])).show()
#SQL風格
df.createOrReplaceTempView("lines")
spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)

udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df['line'])).show()

+--------------------+
|          udf1(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

+----------------------+
|udf1(line)            |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

+--------------------+
|    split_line(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+



In [26]:
rdd = sc.parallelize([[1],[2],[3]])
df = rdd.toDF(["num"])

def process(data):
    return {"num": data, "letters": string.ascii_letters[data]}

#udf回傳值是字典的話要用StructType來實作

udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).add("letters", StringType(), nullable=True))
df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)

+---------+
|udf1(num)|
+---------+
|{1, b}   |
|{2, c}   |
|{3, d}   |
+---------+

+---------+
|udf1(num)|
+---------+
|{1, b}   |
|{2, c}   |
|{3, d}   |
+---------+



In [28]:
rdd = sc.parallelize([1 , 2, 3, 4, 5], 3)
df = rdd.map(lambda x: [x]).toDF(['num'])

# 折衷方式達成UDAF 利用RDD算子達到聚合的操作
# 從DataFrame進行repartition回傳是row對象-->因為是從DataFrame去重新分區
single_partition_rdd = df.rdd.repartition(1)

def process(iter):
    sum = 0
    for row in iter:
        sum += row['num']
    return [sum]  # 這裡要回傳list，因為mapPartition方法支持的是list對象

print(single_partition_rdd.mapPartitions(process).collect())

[15]


In [41]:
rdd = sc.parallelize([(1,"國語",90),(2,"數學",91),(3,"英語",92),(2,"英語",96),(2,"國語",91),(1,"數學",94),(3,"數學",95),(1,"英語",91),(3,"國語",91)])
schema = StructType().add("name", StringType()).add("class", StringType()).add("score", IntegerType())
df = rdd.toDF(schema)

df.createOrReplaceTempView("stu")

# 聚合窗口函數
spark.sql("SELECT *, AVG(score) OVER() AS avg_score  FROM stu").show()

# 排序窗口函數
spark.sql("""
    SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
    DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,
    RANK() OVER(ORDER BY score) AS rank
    FROM stu
    """).show()

# NTILE分組窗口函數
spark.sql("""
    SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
    """).show()


+----+-----+-----+-----------------+
|name|class|score|        avg_score|
+----+-----+-----+-----------------+
|   1| 國語|   90|92.33333333333333|
|   2| 數學|   91|92.33333333333333|
|   3| 英語|   92|92.33333333333333|
|   2| 英語|   96|92.33333333333333|
|   2| 國語|   91|92.33333333333333|
|   1| 數學|   94|92.33333333333333|
|   3| 數學|   95|92.33333333333333|
|   1| 英語|   91|92.33333333333333|
|   3| 國語|   91|92.33333333333333|
+----+-----+-----+-----------------+

+----+-----+-----+---------------+----------+----+
|name|class|score|row_number_rank|dense_rank|rank|
+----+-----+-----+---------------+----------+----+
|   2| 國語|   91|              6|         1|   2|
|   3| 國語|   91|              8|         1|   2|
|   1| 國語|   90|              9|         2|   1|
|   3| 數學|   95|              2|         1|   8|
|   1| 數學|   94|              3|         2|   7|
|   2| 數學|   91|              5|         3|   2|
|   2| 英語|   96|              1|         1|   9|
|   3| 英語|   92|              4|        

In [1]:
pip install koalas

Collecting koalas
  Downloading koalas-1.8.2-py3-none-any.whl (390 kB)
[K     |████████████████████████████████| 390 kB 1.2 MB/s eta 0:00:01
Installing collected packages: koalas
Successfully installed koalas-1.8.2
Note: you may need to restart the kernel to use updated packages.


In [5]:
import pandas as pd
import numpy as np
import databricks.koalas as ks

In [27]:
dates = pd.date_range('20130101', periods=6)
pdf = pd.DataFrame(np.random.randn(6,4), index=dates, columns=list('ABCD'))
kdf = ks.from_pandas(pdf) #底層跑spark的分布式計算
type(kdf)
sdf = spark.createDataFrame(pdf)
kdf = sdf.to_koalas()
kdf
ks.DataFrame({'A':['foo', 'bar'], 'B': [1, 2]})
kdf.head()
kdf.index
kdf.columns
kdf.to_numpy()
kdf.describe()
kdf.T
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns)+['E'])
pdf1.loc[dates[0]:dates[1], 'E'] = 1
pdf1
kdf1 = ks.from_pandas(pdf1)
kdf1.dropna(how='any')
kdf1.fillna(value=5)
kdf.groupby('A').sum()
kdf.to_csv('foo.csv')
ks.read_csv('foo.csv').head(10)
kdf.to_parquet('bar.parquet')
ks.read_parquet('bar.parquet')
kdf.to_spark_io('zoo.orc', format="orc") #調用spark io
ks.read_spark_io('zoo.orc', format="orc").head(10)

Unnamed: 0_level_0,B,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
-0.751916,0.332331,-2.377288,1.663634
-2.000716,0.91449,-1.668963,-1.13285
-0.514297,0.937845,0.183965,2.681256
0.026387,1.104478,-1.312707,-0.134974
-0.172941,1.11082,0.477678,1.788781
-0.896663,-0.741169,-0.072685,1.004584
