In [1]:
from pyspark.sql import types
import os
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import col, date_trunc

In [2]:
credentials_location = '/home/timur/ZOOMDE_PROJECT/keys/project-24508-356bfc7cf395.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

23/04/27 01:52:27 WARN Utils: Your hostname, timur-IdeaPad-5-14ALC05 resolves to a loopback address: 127.0.1.1; using 192.168.88.253 instead (on interface wlo1)
23/04/27 01:52:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/04/27 01:52:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
data = spark \
    .read \
        .format("parquet") \
        .option("header", "true") \
        .parquet("gs://dtc_data_lake_project-24508/data/year=2015/month=1/")

                                                                                

In [6]:
data.columns

['rental_id',
 'duration',
 'bike_id',
 'end_date',
 'end_station_id',
 'end_station_name',
 'start_date',
 'start_station_id',
 'start_station_name']

In [7]:
data.select(date_trunc("day", "start_date")).show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------------------------+
|date_trunc(day, start_date)|
+---------------------------+
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
|        2015-01-17 00:00:00|
+---------------------------+
only showing top 20 rows



                                                                                

In [8]:
data= data.select('rental_id',
 'duration',
 'bike_id',
 date_trunc("day", "end_date").alias("end_date"),
 'end_station_id',
 'end_station_name',
 date_trunc("day", "start_date").alias("start_date"),
 'start_station_id',
 'start_station_name')       

In [9]:
data.show()

[Stage 2:>                                                          (0 + 1) / 1]

+---------+--------+-------+-------------------+--------------+--------------------+-------------------+----------------+--------------------+
|rental_id|duration|bike_id|           end_date|end_station_id|    end_station_name|         start_date|start_station_id|  start_station_name|
+---------+--------+-------+-------------------+--------------+--------------------+-------------------+----------------+--------------------+
| 40628023|     600|   4226|2015-01-18 00:00:00|           485|Old Ford Road, Be...|2015-01-17 00:00:00|             692|Cadogan Close, Vi...|
| 40627889|    1740|  12539|2015-01-18 00:00:00|           293|Kensington Olympi...|2015-01-17 00:00:00|             109|  Soho Square , Soho|
| 40627985|     900|  11362|2015-01-18 00:00:00|           212|Campden Hill Road...|2015-01-17 00:00:00|             296|Knaresborough Pla...|
| 40627971|    1020|   4660|2015-01-18 00:00:00|           131|Eversholt Street ...|2015-01-17 00:00:00|             713|Hawley Crescent, ...|

                                                                                

In [16]:
data.registerTempTable('data')



In [17]:
df_2 = spark.sql("""
            SELECT end_date as date, 
            count(bike_id) as count_roads,
            ROUND(AVG(duration/60),1) as avg_duration_minutes
            FROM data
            Group BY  end_date 
            ORDER BY date ASC;
        """)

In [20]:
df_1 = spark.sql("""SELECT S.date,S.station_name,S.start_count,E.end_count
                                FROM
                                    (SELECT  start_date as date, 
                                    count(bike_id) as start_count,
                                    start_station_name as station_name
                                    FROM data
                                    Group BY   date,start_station_name
                                    ORDER BY date ASC) AS S
                                FULL OUTER JOIN
                                    (SELECT start_date as date, 
                                    count(bike_id) as end_count,
                                    end_station_name 
                                    FROM data
                                    Group BY   date,end_station_name
                                    ORDER BY date ASC) AS E
                                ON S.date = E.date AND S.station_name = E.end_station_name;
                                """)

In [21]:
df_1.show()



+-------------------+--------------------+-----------+---------+
|               date|        station_name|start_count|end_count|
+-------------------+--------------------+-----------+---------+
|2015-01-04 00:00:00|Abingdon Villas, ...|         20|       26|
|2015-01-04 00:00:00|  Ackroyd Drive, Bow|          6|        4|
|2015-01-04 00:00:00|Albert Bridge Roa...|         40|       52|
|2015-01-04 00:00:00|Albert Gate, Hyde...|         78|       86|
|2015-01-04 00:00:00|Alderney Street, ...|         38|       36|
|2015-01-04 00:00:00|Alfred Place, Blo...|         18|       18|
|2015-01-04 00:00:00|All Saints Church...|         40|       38|
|2015-01-04 00:00:00|All Saints' Road,...|         16|       16|
|2015-01-04 00:00:00|Alma Road, Wandsw...|         12|       10|
|2015-01-04 00:00:00|Altab Ali Park, W...|         58|       40|
|2015-01-04 00:00:00|Antill Road, Mile...|         12|       14|
|2015-01-04 00:00:00|Appold Street, Li...|         16|        8|
|2015-01-04 00:00:00|Ashl

                                                                                

In [24]:

df_1 = spark.sql("""SELECT start_date
       FROM data;
        """)

In [26]:
df_1.show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------------------+
|         start_date|
+-------------------+
|2015-01-17 23:50:00|
|2015-01-17 23:31:00|
|2015-01-17 23:45:00|
|2015-01-17 23:43:00|
|2015-01-17 23:42:00|
|2015-01-17 21:20:00|
|2015-01-17 23:52:00|
|2015-01-17 23:49:00|
|2015-01-17 23:53:00|
|2015-01-17 23:56:00|
|2015-01-17 23:21:00|
|2015-01-17 23:23:00|
|2015-01-17 23:37:00|
|2015-01-17 23:46:00|
|2015-01-17 23:57:00|
|2015-01-17 23:45:00|
|2015-01-17 23:52:00|
|2015-01-17 23:16:00|
|2015-01-17 23:49:00|
|2015-01-17 23:55:00|
+-------------------+
only showing top 20 rows



                                                                                

In [14]:
dft\
    .write \
    .format("parquet")\
    .partitionBy("year", "month") \
    .mode("append") \
    .save("gs://dtc_data_lake_de-71680/data/")

AnalysisException: Partition column `year` not found in schema struct<rental_id:int,duration:int,bike_id:int,end_date:timestamp,end_station_id:int,end_station_name:string,start_date:timestamp,start_station_id:int,start_station_name:string>

In [7]:
data = spark \
    .read \
    .format("parquet") \
    .option("header", "true") \
    .parquet("gs://dtc_data_lake_de-71680/data/year=2016/mount=1/")

23/04/26 23:57:25 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: gs://dtc_data_lake_de-71680/data/year=2016/mount=1/.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.executio

Py4JJavaError: An error occurred while calling o107.parquet.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:752)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:750)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:579)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 29 more


In [7]:
data.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------+--------+-------+-------------------+-------------+--------------------+-------------------+---------------+--------------------+
|Rental Id|Duration|Bike Id|           End Date|EndStation Id|     EndStation Name|         Start Date|StartStation Id|   StartStation Name|
+---------+--------+-------+-------------------+-------------+--------------------+-------------------+---------------+--------------------+
| 50754225|     240|  11834|2016-01-10 00:04:00|          383|  Frith Street, Soho|2016-01-10 00:00:00|             18|Drury Lane, Coven...|
| 50754226|     300|   9648|2016-01-10 00:05:00|          719|Victoria Park Roa...|2016-01-10 00:00:00|            479|Pott Street, Beth...|
| 50754227|    1200|  10689|2016-01-10 00:20:00|          272|Baylis Road, Wate...|2016-01-10 00:00:00|            425|Harrington Square...|
| 50754228|     780|   8593|2016-01-10 00:14:00|          471|Hewison Street, O...|2016-01-10 00:01:00|            487|Canton Street, Po...|
| 50754229|  

                                                                                

In [10]:
df_1 = data.select(
    ["EndStation Id", "EndStation Name"])

df_2 = data.select(
    ["StartStation Id", "StartStation Name"])

df_2 = df_2.withColumnRenamed("StartStation Id", "Station_Id")\
       .withColumnRenamed("StartStation Name", "Station_Name")

df_1 = df_1.withColumnRenamed("EndStation Id", "Station_Id")\
       .withColumnRenamed("EndStation Name", "Station_Name")


station = df_1.union(df_2)

station.registerTempTable('station')


df_result = spark.sql("""
SELECT 
    Station_Id, 
    Station_Name
FROM
    station
GROUP BY 
  1,2
""")



In [11]:
df_result.show()



+----------+--------------------+
|Station_Id|        Station_Name|
+----------+--------------------+
|       520|Bancroft Road, Be...|
|       352|Vauxhall Street, ...|
|       368|Harriet Street, K...|
|       467| Southern Grove, Bow|
|       740|Sirdar Road, Avon...|
|       402|Penfold Street, M...|
|       750|Culvert Road, Bat...|
|       686|Beryl Road, Hamme...|
|       433|Wren Street, Holborn|
|       764|St. John's Road, ...|
|       105|Westbourne Grove,...|
|       748|Hertford Road, De...|
|       677|Heath Road, Batte...|
|       729|St. Peter's Terra...|
|       312|Grove End Road, S...|
|       456|Parkway, Camden Town|
|       144|Kennington Cross,...|
|        57|Guilford Street ,...|
|       247|St. John's Wood C...|
|       448|Fisherman's Walk ...|
+----------+--------------------+
only showing top 20 rows



                                                                                

In [6]:
df.write \
  .format("bigquery") \
  .option("writeMethod", "direct") \
  .save()

NameError: name 'df' is not defined

In [28]:
bucket = "dataproc-temp-us-central1-857387732128-knznbrvo"
spark.conf.set('temporaryGcsBucket', bucket)

In [22]:
df.show()

[Stage 2:>                                                          (0 + 1) / 1]

+----------+--------------------+
|Station Id|        Station Name|
+----------+--------------------+
|       383|  Frith Street, Soho|
|       719|Victoria Park Roa...|
|       272|Baylis Road, Wate...|
|       471|Hewison Street, O...|
|       399|Brick Lane Market...|
|       671|Parsons Green Sta...|
|       450|Jubilee Street, S...|
|       780|Imperial Wharf St...|
|       638|Falcon Road, Clap...|
|       647|Richmond Way, She...|
|        86|Sancroft Street, ...|
|       600|South Lambeth Roa...|
|       552|Watney Street, Sh...|
|       518|Antill Road, Mile...|
|       522|Clinton Road, Mil...|
|        78|Sadlers Sports Ce...|
|       409|Strata, Elephant ...|
|        22|Northington Stree...|
|       396|Shouldham Street,...|
|       242|Beaumont Street, ...|
+----------+--------------------+
only showing top 20 rows



                                                                                

In [29]:
df.registerTempTable('trips_data')

In [30]:
df_result = spark.sql("""SELECT *
FROM trips_data
""")

In [9]:
dft.write\
    .format("bigquery")\
    .mode("overwrite")\
    .option("temporaryGcsBucket", "dtc_data_lake_de-71680")\
    .option("database", "de-71680.bicycles_data_all")\
    .option("table", "de-71680.bicycles_data_all.station")\
    .option("createDisposition", "CREATE_IF_NEEDED")\
    .save()

Py4JJavaError: An error occurred while calling o142.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: bigquery. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 16 more


In [33]:
df.show()



+----------+--------------------+
|Station Id|        Station Name|
+----------+--------------------+
|       520|Bancroft Road, Be...|
|       352|Vauxhall Street, ...|
|       368|Harriet Street, K...|
|       467| Southern Grove, Bow|
|       740|Sirdar Road, Avon...|
|       402|Penfold Street, M...|
|       750|Culvert Road, Bat...|
|       686|Beryl Road, Hamme...|
|       433|Wren Street, Holborn|
|       764|St. John's Road, ...|
|       105|Westbourne Grove,...|
|       748|Hertford Road, De...|
|       677|Heath Road, Batte...|
|       729|St. Peter's Terra...|
|       312|Grove End Road, S...|
|       456|Parkway, Camden Town|
|       144|Kennington Cross,...|
|        57|Guilford Street ,...|
|       247|St. John's Wood C...|
|       448|Fisherman's Walk ...|
+----------+--------------------+
only showing top 20 rows



                                                                                

In [13]:
df_censo.write\ 
    . формат ( «большой запрос» )\ 
    .mode ( «перезаписать» )\ 
    .option ( «temporaryGcsBucket» , «censo-ensino-superior» )\ 
    .option ( «база данных» , «censo_ensino_superior» )\ 
    .option ( «таблица» , "censo_ensino_superior.cursos_graduacao_e_licenciatura" )\ 
    .option( "createDisposition" , "CREATE_IF_NEEDED" )\ 
    .save()

SyntaxError: unexpected character after line continuation character (2923426471.py, line 1)