In [3]:
!pip --version
!pip install pyspark py4j

pip 24.1.2 from /usr/local/lib/python3.11/dist-packages/pip (python 3.11)


In [158]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import col, to_date, avg, sum as _sum, count as _count, year

# Создание объекта SparkSession
spark = SparkSession.builder \
    .appName('MySparkApp') \
    .config('spark.master', 'local[*]') \
    .getOrCreate()


In [159]:
df_authors = spark.read.csv('/content/sample_data/authors.csv', header=True, inferSchema=True)
df_books = spark.read.csv('/content/sample_data/books.csv', header=True, inferSchema=True)

df_authors.show(5)
df_books.show(5)


+---------+--------+----------+-------+
|author_id|    name|birth_date|country|
+---------+--------+----------+-------+
|        1|Author_1|1960-12-31|  India|
|        2|Author_2|1965-12-31| Canada|
|        3|Author_3|1970-12-31|    USA|
|        4|Author_4|1975-12-31|     UK|
|        5|Author_5|1980-12-31|    USA|
+---------+--------+----------+-------+
only showing top 5 rows

+-------+------+---------+-----------+-----+------------+
|book_id| title|author_id|      genre|price|publish_date|
+-------+------+---------+-----------+-----+------------+
|      1|Book_1|        2|    Mystery|73.57|  1980-12-31|
|      2|Book_2|        1|Non-Fiction| 41.1|  1982-12-31|
|      3|Book_3|       10|    Fiction|10.63|  1984-12-31|
|      4|Book_4|        9|Non-Fiction|46.31|  1986-12-31|
|      5|Book_5|        7|    Science|31.13|  1988-12-31|
+-------+------+---------+-----------+-----+------------+
only showing top 5 rows



In [160]:
df_authors.printSchema()
df_books.printSchema()

root
 |-- author_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- country: string (nullable = true)

root
 |-- book_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- price: double (nullable = true)
 |-- publish_date: date (nullable = true)



In [161]:
# Метод dtypes возвращает типы данных всех столбцов в DataFrame в виде объекта
# типа pyspark.sql.types.StructType, который можно представить в виде словаря,
# где ключами являются названия столбцов, а значениями — типы данных.

print(df_authors.dtypes) # вернет список картежей
print(df_authors.dtypes[2]) # обратимся по индексу к определенному кортежу
print(f' обект - {df_authors.dtypes[2][1]} относится к классу {type(df_authors.dtypes[2][1])}')
print('-'*100)

#Метод schema возвращает полную схему DataFrame в виде объекта типа StructType,
#который описывает структуру данных, включая названия столбцов,
#их типы данных и информацию о том, могут ли значения быть null.
print(df_authors.schema)
print(df_authors.schema['birth_date'].dataType)

[('author_id', 'int'), ('name', 'string'), ('birth_date', 'date'), ('country', 'string')]
('birth_date', 'date')
 обект - date относится к классу <class 'str'>
----------------------------------------------------------------------------------------------------
StructType([StructField('author_id', IntegerType(), True), StructField('name', StringType(), True), StructField('birth_date', DateType(), True), StructField('country', StringType(), True)])
DateType()


In [162]:
# Преобразовать столбцы publish_date и birth_date в формат даты.
if not isinstance(df_authors.schema['birth_date'].dataType, DateType):
# 1 способ (подойдет, если данные в колонке записаны в ожидаемой последовательности
# yyyy-MM-dd)
    df_authors = df_authors.withColumn('birth_date', col('birth_date').cast(DateType()))


if not isinstance(df_books.schema['publish_date'].dataType, DateType):
# 2 способ
    df_books = df_books.withColumn("publish_date", to_date(col("publish_date"), "yyyy-MM-dd"))


In [163]:
# Объедините таблицы books и authors по author_id.
df_books_j_authors = df_books.join(df_authors, on='author_id', how='left')
df_books_j_authors.show(5)


+---------+-------+------+-----------+-----+------------+---------+----------+---------+
|author_id|book_id| title|      genre|price|publish_date|     name|birth_date|  country|
+---------+-------+------+-----------+-----+------------+---------+----------+---------+
|        2|      1|Book_1|    Mystery|73.57|  1980-12-31| Author_2|1965-12-31|   Canada|
|        1|      2|Book_2|Non-Fiction| 41.1|  1982-12-31| Author_1|1960-12-31|    India|
|       10|      3|Book_3|    Fiction|10.63|  1984-12-31|Author_10|2005-12-31|    India|
|        9|      4|Book_4|Non-Fiction|46.31|  1986-12-31| Author_9|2000-12-31|Australia|
|        7|      5|Book_5|    Science|31.13|  1988-12-31| Author_7|1990-12-31|      USA|
+---------+-------+------+-----------+-----+------------+---------+----------+---------+
only showing top 5 rows



In [164]:
#топ-5 авторов, книги которых принесли наибольшую выручку.
df_top_5_authors = df_books_j_authors.groupBy(col('author_id'), col('name')) \
  .agg(_sum('price').alias('total_revenue'))\
  .select(col('author_id'),col('name'), col('total_revenue')) \
  .orderBy(col('total_revenue').desc()).limit(5)

top_5_authors.show()

Py4JJavaError: An error occurred while calling o1142.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:829)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2702)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.$anonfun$apply$1(CoalesceShufflePartitions.scala:61)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:58)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:34)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$optimizeQueryStage$2(AdaptiveSparkPlanExec.scala:169)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.optimizeQueryStage(AdaptiveSparkPlanExec.scala:168)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:591)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:540)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:580)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:580)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:580)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:580)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:580)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:580)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:536)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:580)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:580)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:580)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:580)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [165]:
# количество книг в каждом жанре.

df_counter_books_gen = df_books.groupBy(col('genre')).agg(_count(col('book_id')).alias('count'))\
    .orderBy(col('count').desc())

df_counter_books_gen.show()


+-----------+-----+
|      genre|count|
+-----------+-----+
|Non-Fiction|    9|
|    Science|    3|
|    Fiction|    3|
|    Fantasy|    3|
|    Mystery|    2|
+-----------+-----+



In [166]:
# Подсчитайте средняя цена книг по каждому автору.
df_avg_price = df_books_j_authors.groupBy(col('author_id'),col('name')) \
  .agg(avg(col('price')).alias('average_price')) \
  .orderBy(col('average_price').desc())

df_avg_price.show()

+---------+---------+-----------------+
|author_id|     name|    average_price|
+---------+---------+-----------------+
|        5| Author_5|            88.83|
|        4| Author_4|             83.7|
|        2| Author_2|          57.9925|
|        9| Author_9|            46.31|
|        7| Author_7|            44.22|
|        6| Author_6|           43.965|
|        1| Author_1|37.28666666666667|
|        8| Author_8|            35.72|
|       10|Author_10|           21.165|
+---------+---------+-----------------+



In [167]:
#Найдите книги, опубликованные после 2000 года, и отсортируйте их по цене.

df_books_after_2000 = df_books_j_authors.filter(year('publish_date') > 2000).orderBy(col('price').desc())
df_books_after_2000.show()

+---------+-------+-------+-----------+-----+------------+--------+----------+---------+
|author_id|book_id|  title|      genre|price|publish_date|    name|birth_date|  country|
+---------+-------+-------+-----------+-----+------------+--------+----------+---------+
|        7|     20|Book_20|    Mystery|91.48|  2018-12-31|Author_7|1990-12-31|      USA|
|        5|     19|Book_19|    Science|88.83|  2016-12-31|Author_5|1980-12-31|      USA|
|        8|     15|Book_15|    Fantasy| 60.0|  2008-12-31|Author_8|1995-12-31|Australia|
|        6|     17|Book_17|    Fantasy|47.57|  2012-12-31|Author_6|1985-12-31|      USA|
|        1|     18|Book_18|Non-Fiction|43.92|  2014-12-31|Author_1|1960-12-31|    India|
|        2|     16|Book_16|    Fiction|36.22|  2010-12-31|Author_2|1965-12-31|   Canada|
|        8|     12|Book_12|Non-Fiction|31.02|  2002-12-31|Author_8|1995-12-31|Australia|
|        1|     14|Book_14|    Fiction|26.84|  2006-12-31|Author_1|1960-12-31|    India|
|        8|     13|Bo

In [155]:
spark.stop()