<a href="https://colab.research.google.com/github/endophenotype/Spark/blob/main/Spark_SQL_%D0%B7%D0%B0%D0%BF%D1%80%D0%BE%D1%81%D1%8B_%D0%B8_%D0%B2%D0%BE%D0%B7%D0%B2%D1%80%D0%B0%D1%89%D0%B5%D0%BD%D0%B8%D0%B5_%D1%80%D0%B5%D0%B7%D1%83%D0%BB%D1%8C%D1%82%D0%B0%D1%82%D0%BE%D0%B2_%D0%B2_%D0%B2%D0%B8%D0%B4%D0%B5_%D1%84%D0%B0%D0%B9%D0%BB%D0%B0_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

В Python можно получить доступ к столбцам DataFrame либо по атрибуту ( df.age), либо по индексу ( df['age']). Хотя первый вариант удобен для интерактивного исследования данных, пользователям настоятельно рекомендуется использовать второй вариант, который рассчитан на будущее и не нарушает имена столбцов, которые также являются атрибутами класса DataFrame.

In [None]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# spark is an existing SparkSession
df = spark.read.json("/content/spark-3.1.1-bin-hadoop3.2/examples/src/main/resources/people.json")

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

# Select only the "name" column
df.select("name").show()

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

# Select people older than 21
df.filter(df['age'] > 21).show()

# Count people by age
df.groupBy("age").count().show()


root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



Функция позволяет sql приложениям SparkSession программно выполнять
SQL-запросы и возвращает результат в виде файла DataFrame.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()
# spark is an existing SparkSession
df = spark.read.json("/content/spark-3.1.1-bin-hadoop3.2/examples/src/main/resources/people.json")
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



Глобальное временное представление привязано к сохраненной системой базе
данных global_temp, и мы должны использовать полное имя для ссылки на
него, например SELECT * FROM global_temp.view1.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()
# spark is an existing SparkSession
df = spark.read.json("/content/spark-3.1.1-bin-hadoop3.2/examples/src/main/resources/people.json")
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



Spark SQL может преобразовывать RDD объектов Row в DataFrame, определяя
типы данных. Строки создаются путем передачи списка пар ключ/значение в
виде kwargs классу Row. Ключи этого списка определяют имена столбцов
таблицы, а типы выводятся путем выборки всего набора данных, аналогично
выводу, который выполняется для файлов JSON.

In [None]:
spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("/content/spark-3.1.1-bin-hadoop3.2/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
 print(name)

Name: Justin


Схема, представленная StructType соответствующей структурой
кортежей или списков в RDD

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()
# Import data types
from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("/content/spark-3.1.1-bin-hadoop3.2/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [None]:
df.createGlobalTempView("daily")

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()
# spark is an existing SparkSession
df = spark.read.option("multiline","true").json("/content/daily_json.js")
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                Date|        PreviousDate|         PreviousURL|           Timestamp|              Valute|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|2023-05-12T11:30:...|2023-05-11T11:30:...|//www.cbr-xml-dai...|2023-05-11T20:00:...|{{AED, R01230, Ди...|
+--------------------+--------------------+--------------------+--------------------+--------------------+



In [None]:
df.select("Valute").show

<bound method DataFrame.show of +--------------------+
|              Valute|
+--------------------+
|{{AED, R01230, Ди...|
+--------------------+
>

In [None]:
from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen

spark = SparkSession.builder.getOrCreate()

url = 'https://www.cbr-xml-daily.ru/daily_json.js'
jsonData = urlopen(url).read().decode('utf-8')
rdd = spark.sparkContext.parallelize([jsonData])
dfurl = spark.read.json(rdd)
dfurl.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                Date|        PreviousDate|         PreviousURL|           Timestamp|              Valute|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|2023-05-12T11:30:...|2023-05-11T11:30:...|//www.cbr-xml-dai...|2023-05-11T20:00:...|{{AED, R01230, Ди...|
+--------------------+--------------------+--------------------+--------------------+--------------------+



In [None]:
from pyspark.sql import SparkSession, functions as F
from urllib.request import urlopen
spark = SparkSession.builder.getOrCreate()
url = 'https://www.cbr-xml-daily.ru/daily_json.js'
jsonData = urlopen(url).read().decode('utf-8')
rdd = spark.sparkContext.parallelize([jsonData])
#df = spark.read.json(rdd)
df = spark.read.option("inferSchema", "true").option("multiline", "true").json(rdd)
df.printSchema()
df.select("Valute.*").show(1,truncate=90,vertical=True)
df.select("Valute.XDR.*").show(1,truncate=90,vertical=True)
df.select("Valute.XDR.*").show(1,truncate=90,vertical=False)

root
 |-- Date: string (nullable = true)
 |-- PreviousDate: string (nullable = true)
 |-- PreviousURL: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Valute: struct (nullable = true)
 |    |-- AED: struct (nullable = true)
 |    |    |-- CharCode: string (nullable = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Nominal: long (nullable = true)
 |    |    |-- NumCode: string (nullable = true)
 |    |    |-- Previous: double (nullable = true)
 |    |    |-- Value: double (nullable = true)
 |    |-- AMD: struct (nullable = true)
 |    |    |-- CharCode: string (nullable = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Nominal: long (nullable = true)
 |    |    |-- NumCode: string (nullable = true)
 |    |    |-- Previous: double (nullable = true)
 |    |    |-- Value: double (nullable = true)
 |    |-- AUD: struct (nullable = true)
 |    |

In [None]:
from pyspark import SparkContext, SparkConf
appName = 'appName'
master = 'local[*]'
sc.stop()
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
a = [('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]
rdd = sc.parallelize(a);
print(rdd.collect())
sorted = rdd.sortByKey()
print(sorted.collect())
rdd2 = rdd.flatMap(lambda s: [(s[1], s[0])])
print(rdd2.collect())
sorted = rdd2.sortByKey()
print(sorted.collect())
sorted = rdd2.sortByKey(False)
print(sorted.collect())
sorted = rdd2.sortByKey()
print(sorted.collect())
indices = sorted.zipWithIndex()
print(indices.collect())

[('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]
[('g1', 2), ('g2', 4), ('g3', 3), ('g4', 8)]
[(2, 'g1'), (4, 'g2'), (3, 'g3'), (8, 'g4')]
[(2, 'g1'), (3, 'g3'), (4, 'g2'), (8, 'g4')]
[(8, 'g4'), (4, 'g2'), (3, 'g3'), (2, 'g1')]
[(2, 'g1'), (3, 'g3'), (4, 'g2'), (8, 'g4')]
[((2, 'g1'), 0), ((3, 'g3'), 1), ((4, 'g2'), 2), ((8, 'g4'), 3)]
