In [1]:
%%time

import pathlib
from os.path import expanduser, join, abspath

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.types import *
from pyspark.sql.window import Window


warehouse_location = abspath('/home/jovyan/work/hive-db/spark-warehouse')
conf_metastore_db = ("spark.driver.extraJavaOptions", "-Dderby.system.home=/home/jovyan/work/hive-db")
# https://www.ibm.com/support/knowledgecenter/en/SS3H8V_1.1.0/com.ibm.izoda.v1r1.azka100/topics/azkic_t_updconfigfiles.htm

spark = SparkSession \
        .builder \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .config(*conf_metastore_db) \
        .enableHiveSupport() \
        .appName("local-test") \
        .getOrCreate()

spark



CPU times: user 332 ms, sys: 74.7 ms, total: 407 ms
Wall time: 6.93 s


In [3]:
%%time
spark.sql("show tables from tmp").toPandas()

CPU times: user 311 ms, sys: 79.9 ms, total: 391 ms
Wall time: 8.92 s


Unnamed: 0,database,tableName,isTemporary
0,tmp,iris,False
1,tmp,timeseries_test1,False


In [4]:
%%time
spark.sql("describe table tmp.timeseries_test1").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|           timestamp|timestamp|   null|
|              value1|   double|   null|
|              value2|   double|   null|
|                  id|   bigint|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|                  id|   bigint|   null|
+--------------------+---------+-------+

CPU times: user 3.37 ms, sys: 0 ns, total: 3.37 ms
Wall time: 1.29 s


In [5]:
%%time
spark.sql("describe table tmp.timeseries_test1").toPandas()

CPU times: user 10.2 ms, sys: 0 ns, total: 10.2 ms
Wall time: 196 ms


Unnamed: 0,col_name,data_type,comment
0,timestamp,timestamp,
1,value1,double,
2,value2,double,
3,id,bigint,
4,# Partition Information,,
5,# col_name,data_type,comment
6,id,bigint,


In [10]:
df = spark.table("tmp.timeseries_test1")
df.show()

+--------------------+--------------------+--------------------+---+
|           timestamp|              value1|              value2| id|
+--------------------+--------------------+--------------------+---+
|2020-02-29 12:06:...|  2.0660433825691515|  0.3048488386829947|  7|
|2020-02-29 12:06:...|   1.242542033226673|  0.7858712362510857|  7|
|2020-02-29 12:06:...|   0.996555747537096| 0.03680071882528735|  7|
|2020-02-29 12:08:...|  2.5032296342232634|  0.7682548549528085|  7|
|2020-02-29 12:09:...| 0.23398213268880763| 0.25270079319093086|  7|
|2020-02-29 12:09:...|  0.9347168153765555|  0.8631755178694166|  7|
|2020-02-29 12:09:...|   1.904715780856427| 0.23133236192424844|  7|
|2020-02-29 12:09:...|  0.7728127979487421|  0.9484733031145899|  7|
|2020-02-29 12:10:...| 0.37409783574282696|  0.7815769744194903|  7|
|2020-02-29 12:11:...|-0.19286551076461178|  0.7541660630389332|  7|
|2020-02-29 12:11:...|  2.3627089070678258|  0.4312587684799759|  7|
|2020-02-29 12:12:...|  1.39628443

In [12]:
%%time
df2 = df.withColumn("date", fn.to_date(fn.col("timestamp")))
df2.show()

+--------------------+--------------------+--------------------+---+----------+
|           timestamp|              value1|              value2| id|      date|
+--------------------+--------------------+--------------------+---+----------+
|2020-02-29 12:06:...|  2.0660433825691515|  0.3048488386829947|  7|2020-02-29|
|2020-02-29 12:06:...|   1.242542033226673|  0.7858712362510857|  7|2020-02-29|
|2020-02-29 12:06:...|   0.996555747537096| 0.03680071882528735|  7|2020-02-29|
|2020-02-29 12:08:...|  2.5032296342232634|  0.7682548549528085|  7|2020-02-29|
|2020-02-29 12:09:...| 0.23398213268880763| 0.25270079319093086|  7|2020-02-29|
|2020-02-29 12:09:...|  0.9347168153765555|  0.8631755178694166|  7|2020-02-29|
|2020-02-29 12:09:...|   1.904715780856427| 0.23133236192424844|  7|2020-02-29|
|2020-02-29 12:09:...|  0.7728127979487421|  0.9484733031145899|  7|2020-02-29|
|2020-02-29 12:10:...| 0.37409783574282696|  0.7815769744194903|  7|2020-02-29|
|2020-02-29 12:11:...|-0.192865510764611

In [17]:
pdf2 = pd.DataFrame(df2.rdd.collect(), columns=df2.columns)
pdf2

Unnamed: 0,timestamp,value1,value2,id,date
0,2020-02-29 12:06:45.500,2.066043,0.304849,7,2020-02-29
1,2020-02-29 12:06:50.500,1.242542,0.785871,7,2020-02-29
2,2020-02-29 12:06:55.500,0.996556,0.036801,7,2020-02-29
3,2020-02-29 12:08:30.500,2.503230,0.768255,7,2020-02-29
4,2020-02-29 12:09:35.500,0.233982,0.252701,7,2020-02-29
...,...,...,...,...,...
295,2020-02-29 12:00:10.500,1.171290,0.684681,7,2020-02-29
296,2020-02-29 12:02:55.500,1.245893,0.852530,7,2020-02-29
297,2020-02-29 12:04:10.500,-0.393725,0.398753,7,2020-02-29
298,2020-02-29 12:00:15.500,0.639097,0.474802,5,2020-02-29


In [20]:
df2.write.saveAsTable("tmp.timeseries_test2", bucketBy="id", sortBy='timestamp')

In [22]:
spark.table("tmp.timeseries_test2").show()

+--------------------+--------------------+--------------------+---+----------+
|           timestamp|              value1|              value2| id|      date|
+--------------------+--------------------+--------------------+---+----------+
|2020-02-29 12:06:...|  2.0660433825691515|  0.3048488386829947|  7|2020-02-29|
|2020-02-29 12:06:...|   1.242542033226673|  0.7858712362510857|  7|2020-02-29|
|2020-02-29 12:06:...|   0.996555747537096| 0.03680071882528735|  7|2020-02-29|
|2020-02-29 12:08:...|  2.5032296342232634|  0.7682548549528085|  7|2020-02-29|
|2020-02-29 12:09:...| 0.23398213268880763| 0.25270079319093086|  7|2020-02-29|
|2020-02-29 12:09:...|  0.9347168153765555|  0.8631755178694166|  7|2020-02-29|
|2020-02-29 12:09:...|   1.904715780856427| 0.23133236192424844|  7|2020-02-29|
|2020-02-29 12:09:...|  0.7728127979487421|  0.9484733031145899|  7|2020-02-29|
|2020-02-29 12:10:...| 0.37409783574282696|  0.7815769744194903|  7|2020-02-29|
|2020-02-29 12:11:...|-0.192865510764611

In [23]:
spark.table("tmp.timeseries_test2").createTempView("tmp")

In [24]:
df2.show()

+--------------------+--------------------+--------------------+---+----------+
|           timestamp|              value1|              value2| id|      date|
+--------------------+--------------------+--------------------+---+----------+
|2020-02-29 12:06:...|  2.0660433825691515|  0.3048488386829947|  7|2020-02-29|
|2020-02-29 12:06:...|   1.242542033226673|  0.7858712362510857|  7|2020-02-29|
|2020-02-29 12:06:...|   0.996555747537096| 0.03680071882528735|  7|2020-02-29|
|2020-02-29 12:08:...|  2.5032296342232634|  0.7682548549528085|  7|2020-02-29|
|2020-02-29 12:09:...| 0.23398213268880763| 0.25270079319093086|  7|2020-02-29|
|2020-02-29 12:09:...|  0.9347168153765555|  0.8631755178694166|  7|2020-02-29|
|2020-02-29 12:09:...|   1.904715780856427| 0.23133236192424844|  7|2020-02-29|
|2020-02-29 12:09:...|  0.7728127979487421|  0.9484733031145899|  7|2020-02-29|
|2020-02-29 12:10:...| 0.37409783574282696|  0.7815769744194903|  7|2020-02-29|
|2020-02-29 12:11:...|-0.192865510764611

In [29]:
df2.write.bucketBy(10, col="id").sortBy("timestamp").saveAsTable("tmp.timeseries_test3", compression="zlib", format="orc")

In [31]:
df3 = spark.table("tmp.timeseries_test3")
df3.show()

+--------------------+--------------------+--------------------+---+----------+
|           timestamp|              value1|              value2| id|      date|
+--------------------+--------------------+--------------------+---+----------+
|2020-02-29 12:20:...|-0.00755845767981...|  0.4750769845419758|  4|2020-02-29|
|2020-02-29 12:21:...|  0.6776624925637711|  0.7062482929290852|  4|2020-02-29|
|2020-02-29 12:22:...| -0.7216558611858177|  0.2919556856157721|  4|2020-02-29|
|2020-02-29 12:22:...|-0.11325754192073956| 0.13418269252149218|  4|2020-02-29|
|2020-02-29 12:23:...|  0.6401706147358921|0.029104250867859727|  4|2020-02-29|
|2020-02-29 12:23:...| -0.5755209961161585|  0.4747288262197418|  4|2020-02-29|
|2020-02-29 12:24:...|  2.2380209503288664|  0.6531309278891648|  4|2020-02-29|
|2020-02-29 12:24:...| -0.0821932335640485| 0.46275072994254707|  4|2020-02-29|
|2020-02-29 12:13:...| 0.06016206946548608|  0.6930656675807086|  4|2020-02-29|
|2020-02-29 12:14:...| -0.28915680497582

In [32]:
spark.sql("describe table tmp.timeseries_test3").toPandas()

Unnamed: 0,col_name,data_type,comment
0,timestamp,timestamp,
1,value1,double,
2,value2,double,
3,id,bigint,
4,date,date,


In [33]:
df3.write.saveAsTable("tmp.timeseries_test4",
                     format="orc", compression="zlib",
                     partitionBy="id", sortBy="timestamp",
                     mode="overwrite")

In [34]:
df4 = spark.table("tmp.timeseries_test4")
df4.show()

+--------------------+--------------------+--------------------+----------+---+
|           timestamp|              value1|              value2|      date| id|
+--------------------+--------------------+--------------------+----------+---+
|2020-02-29 12:12:...|  2.5340876903569978|  0.9524270303929632|2020-02-29|  6|
|2020-02-29 12:16:...|  2.2823237964136123|   0.772920545206864|2020-02-29|  6|
|2020-02-29 12:16:...|   2.132622036082666|  0.9866417283252772|2020-02-29|  6|
|2020-02-29 12:17:...|  0.4658492497223723|  0.4939304810316253|2020-02-29|  6|
|2020-02-29 12:18:...|  0.6872809611016502|  0.5196961868188861|2020-02-29|  6|
|2020-02-29 12:18:...|   2.056135309948681| 0.48023049232007986|2020-02-29|  6|
|2020-02-29 12:19:...|-0.27533302774850776|  0.4467243404880119|2020-02-29|  6|
|2020-02-29 12:19:...|   2.243374516273532| 0.12909914319582083|2020-02-29|  6|
|2020-02-29 12:21:...|   1.262637914206263|  0.7240378457531121|2020-02-29|  6|
|2020-02-29 12:24:...| -0.66980073245993

In [35]:
df3.write.saveAsTable("tmp.timeseries_test5",
                     format="orc", compression="zlib",
                     partitionBy="id", orderBy="timestamp",
                     mode="overwrite")

In [36]:
df5 = spark.table("tmp.timeseries_test5")
df5.show()

+--------------------+--------------------+--------------------+----------+---+
|           timestamp|              value1|              value2|      date| id|
+--------------------+--------------------+--------------------+----------+---+
|2020-02-29 12:12:...|  2.5340876903569978|  0.9524270303929632|2020-02-29|  6|
|2020-02-29 12:16:...|  2.2823237964136123|   0.772920545206864|2020-02-29|  6|
|2020-02-29 12:16:...|   2.132622036082666|  0.9866417283252772|2020-02-29|  6|
|2020-02-29 12:17:...|  0.4658492497223723|  0.4939304810316253|2020-02-29|  6|
|2020-02-29 12:18:...|  0.6872809611016502|  0.5196961868188861|2020-02-29|  6|
|2020-02-29 12:18:...|   2.056135309948681| 0.48023049232007986|2020-02-29|  6|
|2020-02-29 12:19:...|-0.27533302774850776|  0.4467243404880119|2020-02-29|  6|
|2020-02-29 12:19:...|   2.243374516273532| 0.12909914319582083|2020-02-29|  6|
|2020-02-29 12:21:...|   1.262637914206263|  0.7240378457531121|2020-02-29|  6|
|2020-02-29 12:24:...| -0.66980073245993

In [47]:
df2.repartition(10, "id").sortWithinPartitions('timestamp').show()

+--------------------+--------------------+-------------------+---+----------+
|           timestamp|              value1|             value2| id|      date|
+--------------------+--------------------+-------------------+---+----------+
|2020-02-29 12:00:...|  1.7736965502445505|0.46648262111200245|  4|2020-02-29|
|2020-02-29 12:01:...|  0.7086479420606693| 0.4127211979439286|  4|2020-02-29|
|2020-02-29 12:02:...|  0.7356685403999356|  0.519537998376152|  4|2020-02-29|
|2020-02-29 12:04:...|  1.4197425079617536|0.44486963544583547|  4|2020-02-29|
|2020-02-29 12:04:...|  1.3028101953461262| 0.2889424419138067|  4|2020-02-29|
|2020-02-29 12:05:...|  0.4678559290465544| 0.4946336643409025|  4|2020-02-29|
|2020-02-29 12:05:...|  2.1533717575325095|0.20276485842361336|  4|2020-02-29|
|2020-02-29 12:07:...|  0.9386582684589627| 0.9523395707346111|  4|2020-02-29|
|2020-02-29 12:07:...| 0.40974312470193475|0.30899722105489447|  4|2020-02-29|
|2020-02-29 12:07:...|  1.4001586527712115|   0.5179

In [48]:
df4.show()

+--------------------+--------------------+--------------------+----------+---+
|           timestamp|              value1|              value2|      date| id|
+--------------------+--------------------+--------------------+----------+---+
|2020-02-29 12:12:...|  2.5340876903569978|  0.9524270303929632|2020-02-29|  6|
|2020-02-29 12:16:...|  2.2823237964136123|   0.772920545206864|2020-02-29|  6|
|2020-02-29 12:16:...|   2.132622036082666|  0.9866417283252772|2020-02-29|  6|
|2020-02-29 12:17:...|  0.4658492497223723|  0.4939304810316253|2020-02-29|  6|
|2020-02-29 12:18:...|  0.6872809611016502|  0.5196961868188861|2020-02-29|  6|
|2020-02-29 12:18:...|   2.056135309948681| 0.48023049232007986|2020-02-29|  6|
|2020-02-29 12:19:...|-0.27533302774850776|  0.4467243404880119|2020-02-29|  6|
|2020-02-29 12:19:...|   2.243374516273532| 0.12909914319582083|2020-02-29|  6|
|2020-02-29 12:21:...|   1.262637914206263|  0.7240378457531121|2020-02-29|  6|
|2020-02-29 12:24:...| -0.66980073245993

In [54]:
df4.where(fn.rand() <= 4).count()

300

In [55]:
df.show()

+--------------------+--------------------+--------------------+---+
|           timestamp|              value1|              value2| id|
+--------------------+--------------------+--------------------+---+
|2020-02-29 12:06:...|  2.0660433825691515|  0.3048488386829947|  7|
|2020-02-29 12:06:...|   1.242542033226673|  0.7858712362510857|  7|
|2020-02-29 12:06:...|   0.996555747537096| 0.03680071882528735|  7|
|2020-02-29 12:08:...|  2.5032296342232634|  0.7682548549528085|  7|
|2020-02-29 12:09:...| 0.23398213268880763| 0.25270079319093086|  7|
|2020-02-29 12:09:...|  0.9347168153765555|  0.8631755178694166|  7|
|2020-02-29 12:09:...|   1.904715780856427| 0.23133236192424844|  7|
|2020-02-29 12:09:...|  0.7728127979487421|  0.9484733031145899|  7|
|2020-02-29 12:10:...| 0.37409783574282696|  0.7815769744194903|  7|
|2020-02-29 12:11:...|-0.19286551076461178|  0.7541660630389332|  7|
|2020-02-29 12:11:...|  2.3627089070678258|  0.4312587684799759|  7|
|2020-02-29 12:12:...|  1.39628443

In [63]:
df4.withColumn("tmp", fn.count('timestamp').over(Window.partitionBy('id'))).show()

+--------------------+--------------------+-------------------+----------+---+---+
|           timestamp|              value1|             value2|      date| id|tmp|
+--------------------+--------------------+-------------------+----------+---+---+
|2020-02-29 12:06:...|  1.5184796173506778| 0.3277533390590386|2020-02-29|  0| 33|
|2020-02-29 12:08:...|  0.6875405355181269|0.06868328345855046|2020-02-29|  0| 33|
|2020-02-29 12:09:...| -0.8022035149666866| 0.4292020703741175|2020-02-29|  0| 33|
|2020-02-29 12:09:...| -1.8711709347225063| 0.7375595941532431|2020-02-29|  0| 33|
|2020-02-29 12:10:...|0.028804745667309062|0.34182441033986044|2020-02-29|  0| 33|
|2020-02-29 12:10:...|  1.8332972810669386| 0.2535274742544955|2020-02-29|  0| 33|
|2020-02-29 12:00:...|   1.402237447500409|   0.80670051209233|2020-02-29|  0| 33|
|2020-02-29 12:01:...|  1.1463375383075516| 0.5636190575704605|2020-02-29|  0| 33|
|2020-02-29 12:02:...|    1.78648473518493|0.01768226570780751|2020-02-29|  0| 33|
|202

In [65]:
df4.groupBy('id').count().show()

+---+-----+
| id|count|
+---+-----+
|  0|   33|
|  7|   31|
|  6|   35|
|  9|   22|
|  5|   27|
|  1|   26|
|  3|   34|
|  8|   31|
|  2|   31|
|  4|   30|
+---+-----+



In [71]:
df4.groupBy('id').agg(fn.count('timestamp').alias('count')).orderBy(fn.col('id').desc()).show()

+---+-----+
| id|count|
+---+-----+
|  9|   22|
|  8|   31|
|  7|   31|
|  6|   35|
|  5|   27|
|  4|   30|
|  3|   34|
|  2|   31|
|  1|   26|
|  0|   33|
+---+-----+



In [72]:
df4.withColumn("count", fn.count('timestamp').over(Window.partitionBy('id')))\
    .withColumn("flg", (10 / fn.col("count")) =< fn.rand()).show()

+--------------------+--------------------+-------------------+----------+---+-----+-----+
|           timestamp|              value1|             value2|      date| id|count|  flg|
+--------------------+--------------------+-------------------+----------+---+-----+-----+
|2020-02-29 12:06:...|  1.5184796173506778| 0.3277533390590386|2020-02-29|  0|   33| true|
|2020-02-29 12:08:...|  0.6875405355181269|0.06868328345855046|2020-02-29|  0|   33| true|
|2020-02-29 12:09:...| -0.8022035149666866| 0.4292020703741175|2020-02-29|  0|   33| true|
|2020-02-29 12:09:...| -1.8711709347225063| 0.7375595941532431|2020-02-29|  0|   33| true|
|2020-02-29 12:10:...|0.028804745667309062|0.34182441033986044|2020-02-29|  0|   33| true|
|2020-02-29 12:10:...|  1.8332972810669386| 0.2535274742544955|2020-02-29|  0|   33| true|
|2020-02-29 12:00:...|   1.402237447500409|   0.80670051209233|2020-02-29|  0|   33|false|
|2020-02-29 12:01:...|  1.1463375383075516| 0.5636190575704605|2020-02-29|  0|   33|false|

In [80]:
df_tmp = df4.withColumn("count", fn.count('timestamp').over(Window.partitionBy('id')))

df_result = df_tmp.where( (10 / fn.col("count")) >= fn.rand()).drop("count")
df_result.show()
df_result.groupBy('id').count().show()

+--------------------+-------------------+-------------------+----------+---+
|           timestamp|             value1|             value2|      date| id|
+--------------------+-------------------+-------------------+----------+---+
|2020-02-29 12:06:...| 1.5184796173506778| 0.3277533390590386|2020-02-29|  0|
|2020-02-29 12:08:...| 0.6875405355181269|0.06868328345855046|2020-02-29|  0|
|2020-02-29 12:09:...|-1.8711709347225063| 0.7375595941532431|2020-02-29|  0|
|2020-02-29 12:02:...| 1.3926286424069434| 0.8907083499049936|2020-02-29|  0|
|2020-02-29 12:03:...| 0.6776132610583343|  0.982867889694063|2020-02-29|  0|
|2020-02-29 12:18:...| 1.8565858798802826| 0.4589806431171831|2020-02-29|  0|
|2020-02-29 12:19:...| 2.1060283565741367| 0.8692002390339924|2020-02-29|  0|
|2020-02-29 12:21:...|  1.778351688805241| 0.7031698025163136|2020-02-29|  0|
|2020-02-29 12:23:...| 0.1789888063954409| 0.5880579265395264|2020-02-29|  0|
|2020-02-29 12:21:...|-1.4617643825380942| 0.6309249371412888|20

In [81]:
df_tmp = df4.withColumn("count", fn.count('timestamp').over(Window.partitionBy('id')))

df_result = df_tmp.where( (5 / fn.col("count")) >= fn.rand()).drop("count")
df_result.show()
df_result.groupBy('id').count().show()

+--------------------+--------------------+-------------------+----------+---+
|           timestamp|              value1|             value2|      date| id|
+--------------------+--------------------+-------------------+----------+---+
|2020-02-29 12:06:...|  1.5184796173506778| 0.3277533390590386|2020-02-29|  0|
|2020-02-29 12:09:...| -1.8711709347225063| 0.7375595941532431|2020-02-29|  0|
|2020-02-29 12:18:...| 0.37799412118867526|0.29340046377325435|2020-02-29|  0|
|2020-02-29 12:19:...| -0.6457348822851219| 0.2947165920597048|2020-02-29|  0|
|2020-02-29 12:19:...|  0.8843374337863038|  0.691939967317213|2020-02-29|  0|
|2020-02-29 12:23:...|  0.1789888063954409| 0.5880579265395264|2020-02-29|  0|
|2020-02-29 12:04:...| -0.3937246834047361| 0.3987534806301314|2020-02-29|  7|
|2020-02-29 12:20:...|   1.193645121988298|0.42348347158768795|2020-02-29|  7|
|2020-02-29 12:11:...|  2.3627089070678258| 0.4312587684799759|2020-02-29|  7|
|2020-02-29 12:12:...|   1.019606648082048|  0.51781

In [84]:
df_tmp = df4.withColumn("count", fn.count('timestamp').over(Window.partitionBy('id')))

df_result = df_tmp.where( (50 / fn.col("count")) >= fn.rand()).drop("count")
df_result.show()
df_result.groupBy('id').count().sort('id').show()  # sort = orderBy

+--------------------+--------------------+-------------------+----------+---+
|           timestamp|              value1|             value2|      date| id|
+--------------------+--------------------+-------------------+----------+---+
|2020-02-29 12:06:...|  1.5184796173506778| 0.3277533390590386|2020-02-29|  0|
|2020-02-29 12:08:...|  0.6875405355181269|0.06868328345855046|2020-02-29|  0|
|2020-02-29 12:09:...| -0.8022035149666866| 0.4292020703741175|2020-02-29|  0|
|2020-02-29 12:09:...| -1.8711709347225063| 0.7375595941532431|2020-02-29|  0|
|2020-02-29 12:10:...|0.028804745667309062|0.34182441033986044|2020-02-29|  0|
|2020-02-29 12:10:...|  1.8332972810669386| 0.2535274742544955|2020-02-29|  0|
|2020-02-29 12:00:...|   1.402237447500409|   0.80670051209233|2020-02-29|  0|
|2020-02-29 12:01:...|  1.1463375383075516| 0.5636190575704605|2020-02-29|  0|
|2020-02-29 12:02:...|    1.78648473518493|0.01768226570780751|2020-02-29|  0|
|2020-02-29 12:02:...|  1.3926286424069434| 0.890708

In [86]:
df_result[['date']].limit(15).rdd.flatMap(lambda x:x).collect()

[datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 2, 29)]

In [87]:
df_result[['value1']].limit(10).rdd.flatMap(lambda x:x).collect()

[1.5184796173506778,
 0.6875405355181269,
 -0.8022035149666866,
 -1.8711709347225063,
 0.028804745667309062,
 1.8332972810669386,
 1.402237447500409,
 1.1463375383075516,
 1.78648473518493,
 1.3926286424069434]

In [90]:
df.select(fn.col('timestamp').cast(StringType())).limit(15).rdd.flatMap(lambda x:x).collect()

['2020-02-29 12:06:45.5',
 '2020-02-29 12:06:50.5',
 '2020-02-29 12:06:55.5',
 '2020-02-29 12:08:30.5',
 '2020-02-29 12:09:35.5',
 '2020-02-29 12:09:45.5',
 '2020-02-29 12:09:50.5',
 '2020-02-29 12:09:55.5',
 '2020-02-29 12:10:25.5',
 '2020-02-29 12:11:50.5',
 '2020-02-29 12:11:55.5',
 '2020-02-29 12:12:00.5',
 '2020-02-29 12:12:25.5',
 '2020-02-29 12:00:30.5',
 '2020-02-29 12:00:35.5']

In [91]:
pd.DataFrame(df_result.rdd.collect(), columns=df_result.columns)

Unnamed: 0,timestamp,value1,value2,date,id
0,2020-02-29 12:06:15.500,1.518480,0.327753,2020-02-29,0
1,2020-02-29 12:08:00.500,0.687541,0.068683,2020-02-29,0
2,2020-02-29 12:09:25.500,-0.802204,0.429202,2020-02-29,0
3,2020-02-29 12:09:40.500,-1.871171,0.737560,2020-02-29,0
4,2020-02-29 12:10:10.500,0.028805,0.341824,2020-02-29,0
...,...,...,...,...,...
295,2020-02-29 12:02:35.500,0.735669,0.519538,2020-02-29,4
296,2020-02-29 12:04:00.500,1.419743,0.444870,2020-02-29,4
297,2020-02-29 12:04:40.500,1.302810,0.288942,2020-02-29,4
298,2020-02-29 12:05:30.500,0.467856,0.494634,2020-02-29,4


- ↑toPandas()ではdatetime.date型に変換出来ずエラーになるが、この方法ならエラー無く変換可能

## Note: 角度付きの矩形領域・楕円領域のIn-Out判定

### Parameter
- 基準中心点の緯度経度(lat_base, lon_base)と回転角:$\theta$、
    - 矩形の場合、縦横の長さ(a, b)
    - 楕円の場合、長軸・短軸の半径(a, b)

の計5つ


### Outline
1. 判定対象点の緯度経度について、基準中心点の緯度経度との比較から$X$, $Y$(`[m]` or `[km]`)の座標に変換
    - $X = c1 \times lon$、$Y = c2 \times lat$
    - $c1, c2$は準定数（はじめに基準点の緯度経度を元にして求める必要があるが、極端に広い領域でなければその後は定数のような扱いで良い）
        - $c1 := c1(lat\_base, lon\_base), c2 := c2(lat\_base, lon\_base)$
        - $lat, lon$は判定対象の各点の緯度経度
2. 回転角$\theta$にもとづいて座標変換（回転）し、$X^{'}$、$Y^{'}$に変換する
    - $X^{'} = X\cos(\theta) - Y\sin(\theta)$
    - $Y^{'} = X\sin(\theta) + Y\cos(\theta)$
3. 変換後の座標では単純な条件で判定可能
    - 矩形
        - ($|X^{'}| <= a$) && ($|Y^{'}| <= b$)
    - 楕円
        - $(\frac{X^{'}}{a})^2 + (\frac{Y^{'}}{b})^2 <= 1$
