In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType, LongType, TimestampType, DecimalType

In [2]:
spark = (
    SparkSession
    .builder
    .appName("Operations")
    .getOrCreate()
)

In [3]:
# Загрузка данных в spark DF
# spark - переменная со SparkSession
# read - метод
# csv - тип данных (на практике больше parquet)
df = spark.read.csv(
    'apple_stock_data.csv',
    inferSchema=True,
    header=True
)


In [5]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [14]:
df.filter("Close>500").show(3)

+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2012-03-30|608.77|610.56|597.94|599.55|26050900|   599.55|
|2012-03-29|612.78|616.56|607.23|609.86|21668300|   609.86|
|2012-03-28|618.38|621.45|610.31|617.62|23385200|   617.62|
+----------+------+------+------+------+--------+---------+
only showing top 3 rows


In [13]:
df.filter("Close < 500").select('Open', 'Close').show(3)

+------+------+
|  Open| Close|
+------+------+
|514.26|497.67|
|490.96|493.42|
|480.76|493.17|
+------+------+
only showing top 3 rows


In [15]:
df.filter("Close > 450").select(['Open', 'Close', 'High']).show(3)

+------+------+------+
|  Open| Close|  High|
+------+------+------+
|608.77|599.55|610.56|
|612.78|609.86|616.56|
|618.38|617.62|621.45|
+------+------+------+
only showing top 3 rows


In [16]:
# Похожая конструкция из pandas
df.filter(
    df["Close"] > 500
).show(3)

+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2012-03-30|608.77|610.56|597.94|599.55|26050900|   599.55|
|2012-03-29|612.78|616.56|607.23|609.86|21668300|   609.86|
|2012-03-28|618.38|621.45|610.31|617.62|23385200|   617.62|
+----------+------+------+------+------+--------+---------+
only showing top 3 rows


In [30]:
d1 = df.filter(
    (
            df["Close"] < 200
    )
    &
    (
            df['Open'] > 200
    )
)

In [33]:
print(f"Количество строк: {d1.count()}")
d1.show()

Количество строк: 8
+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2010-01-29|201.08| 202.2|190.25|192.06|44498300|   192.06|
|2010-01-28|204.93| 205.5| 198.7|199.29|41910800|   199.29|
|2010-01-22|206.78| 207.5|197.16|197.75|31491700|   197.75|
|2009-12-01|202.24|202.77|196.83|196.97|16634400|   196.97|
|2009-11-30|201.11|201.68|198.77|199.91|15173500|   199.91|
|2009-10-27|201.66|202.81|196.45|197.37|27019700|   197.37|
|2009-10-20| 200.6|201.75|197.85|198.76|40751400|   198.76|
|2007-12-28|200.59|201.56|196.88|199.83|24987400|   199.83|
+----------+------+------+------+------+--------+---------+



In [26]:
d = df.filter(
    (
        df["Close"] < 200
    )
    |
    (
        df['Open'] > 200
    )
)

print(d.count())

6946


In [34]:
print(f'Колисество строк в df {df.count()}')

Колисество строк в df 6953


In [35]:
# Символы инверсии и т.п., применимы в фильтрации (~, !)
df.filter(
    (
        df["Close"] < 200
    )
    &
    ~(
        df['Open'] < 200
    )
).show()

+----------+------+------+------+------+--------+---------+
|      Date|  Open|  High|   Low| Close|  Volume|Adj Close|
+----------+------+------+------+------+--------+---------+
|2010-02-23| 200.0|201.33|195.71|197.06|20539100|   197.06|
|2010-01-29|201.08| 202.2|190.25|192.06|44498300|   192.06|
|2010-01-28|204.93| 205.5| 198.7|199.29|41910800|   199.29|
|2010-01-22|206.78| 207.5|197.16|197.75|31491700|   197.75|
|2009-12-01|202.24|202.77|196.83|196.97|16634400|   196.97|
|2009-11-30|201.11|201.68|198.77|199.91|15173500|   199.91|
|2009-10-27|201.66|202.81|196.45|197.37|27019700|   197.37|
|2009-10-20| 200.6|201.75|197.85|198.76|40751400|   198.76|
|2007-12-28|200.59|201.56|196.88|199.83|24987400|   199.83|
+----------+------+------+------+------+--------+---------+



In [36]:
df.filter(df["Low"] == 197.16).show()

+----------+------+-----+------+------+--------+---------+
|      Date|  Open| High|   Low| Close|  Volume|Adj Close|
+----------+------+-----+------+------+--------+---------+
|2010-01-22|206.78|207.5|197.16|197.75|31491700|   197.75|
+----------+------+-----+------+------+--------+---------+



In [37]:
# Собрать данные на локальную машину !Самая дорогая операция!
df.filter(df["Low"] == 197.16).collect()

[Row(Date=datetime.date(2010, 1, 22), Open=206.78, High=207.5, Low=197.16, Close=197.75, Volume=31491700, Adj Close=197.75)]

In [40]:
result = df.filter(
    (df["Low"] == 197.16)
).collect()

In [41]:
type(result[0])

pyspark.sql.types.Row

In [42]:
row = result[0]

In [43]:
# Преобазуем объект в dict()
row.asDict()

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78,
 'High': 207.5,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 31491700,
 'Adj Close': 197.75}

In [44]:
# Объект содержит итератор, его можно пройти целиком
for item in result[0]:
    print(item)

2010-01-22
206.78
207.5
197.16
197.75
31491700
197.75


In [45]:
for i in row:
    print(i)

2010-01-22
206.78
207.5
197.16
197.75
31491700
197.75
