Użyj każdą z tych funkcji 
* `unix_timestamp()` 
* `date_format()`
* `to_unix_timestamp()`
* `from_unixtime()`
* `to_date()` 
* `to_timestamp()` 
* `from_utc_timestamp()` 
* `to_utc_timestamp()`

In [0]:
from pyspark.sql.functions import *

kolumny = ["timestamp", "unix", "Date"]
dane = [("2015-03-22T14:13:34", 1646641525847, "May, 2021"),
        ("2015-03-22T15:03:18", 1646641557555, "Mar, 2021"),
        ("2015-03-22T14:38:39", 1646641578622, "Jan, 2021")]

dataFrame = spark.createDataFrame(dane, kolumny). \
    withColumn("unix_timestamp", unix_timestamp()). \
    withColumn("date_format", date_format("timestamp", "dd/MM/yyyy")). \
    withColumn("to_unix_timestamp", unix_timestamp(current_date(), "yyyy-MM-dd")). \
    withColumn("from_unixtime", from_unixtime("unix_timestamp")). \
    withColumn("to_date", to_date("timestamp")). \
    withColumn("to_timestamp", to_timestamp("from_unixtime")). \
    withColumn("from_utc_timestamp", from_utc_timestamp("timestamp", tz="UTC")). \
    withColumn("to_utc_timestamp", to_utc_timestamp("from_unixtime", tz="UTC"))

display(dataFrame)

In [0]:
dataFrame.printSchema()

## unix_timestamp(..) & cast(..)

Konwersja **string** to a **timestamp**.

Lokalizacja funkcji 
* `pyspark.sql.functions` in the case of Python
* `org.apache.spark.sql.functions` in the case of Scala & Java

## 1. Zmiana formatu wartości timestamp yyyy-MM-dd'T'HH:mm:ss 
`unix_timestamp(..)`

Dokumentacja API `unix_timestamp(..)`:
> Convert time string with given pattern (see <a href="http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html" target="_blank">SimpleDateFormat</a>) to Unix time stamp (in seconds), return null if fail.

`SimpleDataFormat` is part of the Java API and provides support for parsing and formatting date and time values.

In [0]:
df2 = dataFrame.withColumn("timestamp", unix_timestamp(current_timestamp(), "yyyy-MM-dd'T'HH:mm:ss"))
display(df2)

2. Zmień format zgodnie z klasą `SimpleDateFormat`**yyyy-MM-dd HH:mm:ss**
  * a. Wyświetl schemat i dane żeby sprawdzicz czy wartości się zmieniły

In [0]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

df2.printSchema()
zmianaFormatu = df2.withColumn("timestamp", df2["timestamp"].cast(StringType()))
zmianaFormatu.printSchema()

In [0]:
#unix_timestamp
display(zmianaFormatu)

## Stwórz nowe kolumny do DataFrame z wartościami year(..), month(..), dayofyear(..)

In [0]:
#date_format
yearDate = df2.withColumn("year", year(to_date("date_format", "dd/MM/yyyy")))
display(yearDate)

In [0]:
#to_date()
toDate = df2.withColumn("month", month(to_date("to_date", "dd/MM/yyyy")))
display(toDate)

In [0]:
#from_unixtime()
fromUnix = df2.withColumn("day", dayofmonth("from_unixtime"))
display(fromUnix)

In [0]:
#to_timestamp()
toTimestamp = df2.withColumn("dayofyear", dayofyear("from_unixtime"))
display(toTimestamp)


In [0]:
#to_utc_timestamp()
toUtcTimestamp = df2.withColumn("minute", minute("from_unixtime"))
display(toUtcTimestamp)



In [0]:
#from_utc_timestamp()
fromUtcTimestamp = df2.withColumn("second", second("from_unixtime"))
display(fromUtcTimestamp)

In [0]:
%fs ls dbfs:/databricks-datasets/flights/

path,name,size,modificationTime
dbfs:/databricks-datasets/flights/README.md,README.md,412,1457766852000
dbfs:/databricks-datasets/flights/airport-codes-na.txt,airport-codes-na.txt,11411,1457749605000
dbfs:/databricks-datasets/flights/departuredelays.csv,departuredelays.csv,33396236,1457749605000


In [0]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("dbfs:/databricks-datasets/flights/departuredelays.csv/")
display(df)
df.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema = StructType([
    StructField("date",IntegerType(),False),
    StructField("delay",IntegerType(),False),
    StructField("distance",IntegerType(),False),
    StructField("origin",StringType(),False),
    StructField("destination",StringType(),False),
])

schema

Out[5]: StructType([StructField('date', IntegerType(), False), StructField('delay', IntegerType(), False), StructField('distance', IntegerType(), False), StructField('origin', StringType(), False), StructField('destination', StringType(), False)])

In [0]:
df = spark.read.format("csv").option("header","true").schema(schema).load("dbfs:/databricks-datasets/flights/departuredelays.csv/")

display(df)
df.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
df_permissive = spark.read.format("csv").schema(schema).options(sep=";",header=True, mode="PERMISSIVE").load("dbfs:/databricks-datasets/flights/departuredelays.csv/")
df_dropmal = spark.read.format("csv").schema(schema).options(sep=";",header=True, mode="DROPMALFORMED").load("dbfs:/databricks-datasets/flights/departuredelays.csv/")
df_failfast = spark.read.format("csv").schema(schema).options(sep=";",header=True, mode="FAILFAST").load("dbfs:/databricks-datasets/flights/departuredelays.csv/")
df_badrecords = spark.read.format("csv").schema(schema).options(sep=";",header=True).option("badRecordsPath","/mnt/source/badrecords").load("dbfs:/databricks-datasets/flights/departuredelays.csv/")

display(df_permissive)
display(df_dropmal)
display(df_failfast)
display(df_badrecords)

date,delay,distance,origin,destination
,,,,
,,,,
,,,,
,,,,
,,,,
,,,,
,,,,
,,,,
,,,,
,,,,


date,delay,distance,origin,destination


In [0]:
df.write.options(header=True, sep=',').format("parquet").mode("overwrite").save("dbfs:/tmp/yeyyy.parquet")

df_test = spark.read.format("parquet").schema(schema).options(header=True).load("dbfs:/tmp/yeyyy.parquet")
display(df_test)

date,delay,distance,origin,destination
1021738,54,599,IAH,ATL
1021708,169,793,IAH,CLT
1022009,57,175,IAH,LFT
1022058,7,175,IAH,CRP
1020738,-5,898,IAH,MSP
1021421,73,906,IAH,RDU
1020800,0,571,IAH,BNA
1021908,4,373,IAH,TUL
1020817,-5,489,IAH,BHM
1022058,118,268,IAH,BRO
