-
Notifications
You must be signed in to change notification settings - Fork 70
/
sp_demo_datastream_public_ip.sql
219 lines (192 loc) · 10.1 KB
/
sp_demo_datastream_public_ip.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
/*##################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################*/
/*
YouTube:
- This is for private IP, but is mainly the same demo: https://youtu.be/ow17MIXHOqw
- For the public IP you can access the data from your laptop via pgAdmin versus SSHing to the VM
Prerequisites (10 minutes):
- Disable Org Policy: sql.restrictAuthorizedNetworks
OPEN: https://console.cloud.google.com/iam-admin/orgpolicies/sql-restrictAuthorizedNetworks/edit?project=${project_id}
- Click "Customize"
- Click "Add Rule"
- Click "Off"
- Save
- Execute Airflow DAG: sample-datastream-PUBLIC-ip-deploy (this will deploy a Cloud SQL database, a reverse proxy vm and datastream)
- Re-Enabled (after DAG is complete) Org Policy: sql.restrictAuthorizedNetworks
OPEN: https://console.cloud.google.com/iam-admin/orgpolicies/sql-restrictAuthorizedNetworks/edit?project=${project_id}
- Click "Inherit parent's policy"
- Save
- Verify the tables are starting to CDC to BigQuery (there will only be 1 row in each until you start the next DAG):
NOTE: The dataset name datastream_public_ip_public as the suffix of "public". This means we are syncing the public schema from Postgres.
SELECT * FROM `${project_id}.datastream_public_ip_public.driver`;
SELECT * FROM `${project_id}.datastream_public_ip_public.review`;
SELECT * FROM `${project_id}.datastream_public_ip_public.payment`;
- Execute Airflow DAG: sample-datastream-PRIVATE-ip-generate-data (this will generate test data that can be joined to the taxi data)
- NOTE: This will insert 10,000 records into driver and about 1 million into review and payment each
- This data was generated by the stored procedure: sp_create_datastream_cdc_data which creates 10 million rows of data
- The stored procedure can be customized to create any data you would like.
(OPTIONAL)
Install pgAdmin to connect to your database
Open this to find the IP Address of your Cloud SQL: https://console.cloud.google.com/sql/instances?project=${project_id}
You will need to Edit your database and add your Public IP (Google "whats my ip")
- Click Edit
- Expand Connections
- Added your public IP address to "Authorized networks"
- Click Save (it will take a minute)
Username: postgres
Database password (this is the random extension used throughout this demo): ${random_extension}
SELECT * FROM driver;
SELECT * FROM review;
SELECT * FROM payment;
(OPTIONAL)
Customizing the OLTP data:
- You can create your own schema and test data by modifying the below files in your Composer "Data" directory
- postgres_create_schema.sql -> Customize this to create your own tables
- ${project_id}.datastream_cdc_data -> Customize this to create your own test data
Use Cases:
- Offload your analytics from your OLTP databases to BigQuery
- Avoid purchasing (potentially expensive) licenses for packaged software
- Avoid puchasing addition hardware for your OLTP database (or storage SAN) and save on licenses
- Perform analytics accross your disparent OLTP databases by CDC to BigQuery
Description:
- This will CDC data from a Cloud SQL Postgres database
- The database is using a Public IP address
- Datastream has been configured to connect to the Public IP address with allowed IP address to your database
- We we join from our OLTP (CDC) data for drivers, reviews and payments to match to our warehouse
Reference:
- n/a
Clean up / Reset script:
- Execute Airflow DAG: sample-datastream-PUBLIC-ip-destroy
DROP VIEW IF EXISTS `${project_id}.${bigquery_taxi_dataset}.looker_datastream`;
*/
-- See the streaming data
SELECT * FROM `${project_id}.datastream_public_ip_public.driver`;
SELECT * FROM `${project_id}.datastream_public_ip_public.review`;
SELECT * FROM `${project_id}.datastream_public_ip_public.payment`;
-- See the counts change (the replication has been set low, but it can take up to 15 seconds)
-- Make sure the Airflow job sample-datastream-public-ip-generate-data is running
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.driver`;
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.review`;
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.payment`;
-- We can find determine rating of our drivers based upon the data
-- We can bring ride duration and other fields by joining the CDC data to the warehouse
WITH driver_data AS
(
SELECT driver_id, driver_name
FROM `${project_id}.datastream_public_ip_public.driver`
)
, overall_drive_rating AS
(
SELECT driver_id, AVG(CAST(review_rating AS FLOAT64)) AS average_rating
FROM `${project_id}.datastream_public_ip_public.review`
GROUP BY 1
)
, warehouse_data AS
(
SELECT trips.TaxiCompany,
trips.PULocationID,
trips.Passenger_Count,
trips.Trip_Distance,
TIMESTAMP_DIFF(trips.Dropoff_DateTime, trips.Pickup_DateTime, MINUTE) AS ride_duration_in_minutes,
reviews.driver_id
FROM `${project_id}.datastream_public_ip_public.review` AS reviews
INNER JOIN `${project_id}.${bigquery_taxi_dataset}.taxi_trips` AS trips
ON reviews.ride_date = CAST(trips.Pickup_DateTime AS DATE)
AND reviews.pickup_location_id = trips.PULocationID
AND reviews.dropoff_location_id = trips.DOLocationID
AND reviews.total_amount = trips.Total_Amount
)
SELECT warehouse_data.*,
driver_data.driver_name,
overall_drive_rating.average_rating
FROM warehouse_data
LEFT JOIN overall_drive_rating
ON warehouse_data.driver_id = overall_drive_rating.driver_id
LEFT JOIN driver_data
ON warehouse_data.driver_id = driver_data.driver_id
ORDER BY driver_data.driver_name ;
-- Optional - perform machine learning on the data or use in a dashboard to show trending data
-- See the payment data to see customers who are paying with credit cards
WITH credit_card_data AS
(
SELECT driver_id, ride_date, passenger_id,credit_card_number, pickup_location_id, dropoff_location_id, total_amount
FROM `${project_id}.datastream_public_ip_public.payment`
)
SELECT driver.driver_name,
payment.passenger_id,
payment.credit_card_number,
trips.*
FROM `${project_id}.datastream_public_ip_public.driver` AS driver
INNER JOIN credit_card_data AS payment
ON payment.driver_id = driver.driver_id
LEFT JOIN `${project_id}.${bigquery_taxi_dataset}.taxi_trips` AS trips
ON payment.ride_date = CAST(trips.Pickup_DateTime AS DATE)
AND payment.pickup_location_id = trips.PULocationID
AND payment.dropoff_location_id = trips.DOLocationID
AND payment.total_amount = trips.Total_Amount
AND trips.Payment_Type_Id = 1 -- credit card
ORDER BY driver.driver_name;
-- Counts should be increasing
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.driver`;
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.review`;
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.payment`;
-- To show Deletes (CDC)
-- Stop the Airflow job: sample-datastream-PUBLIC-ip-generate-data
-- Login into the Postgres SQL (pgAdmin) and run the following:
/*
DELETE FROM review WHERE review_id < 100000;
DELETE FROM payment WHERE review_id < 100000;
UPDATE driver SET driver_name = 'Google Waymo Driverless' WHERE driver_id = 1;
*/
-- Wait a few minutes and you should see the data updated/deleted in BigQuery
SELECT * FROM `${project_id}.datastream_public_ip_public.driver` WHERE driver_id = 1;
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.review`;
SELECT COUNT(*) AS Cnt FROM `${project_id}.datastream_public_ip_public.payment`;
-- Build a Looker Studio report to show the data
CREATE OR REPLACE VIEW `${project_id}.${bigquery_taxi_dataset}.looker_datastream`
AS
SELECT driver.driver_name,
location.borough,
location.zone,
DATE_TRUNC(review.ride_date, WEEK) AS ride_week,
review_rating
FROM `${project_id}.datastream_public_ip_public.driver` AS driver
INNER JOIN `${project_id}.datastream_public_ip_public.review` AS review
ON driver.driver_id = review.driver_id
INNER JOIN `${project_id}.taxi_dataset.taxi_trips` AS trips
ON review.ride_date = CAST(trips.Pickup_DateTime AS DATE)
AND review.pickup_location_id = trips.PULocationID
AND review.dropoff_location_id = trips.DOLocationID
AND review.total_amount = trips.Total_Amount
INNER JOIN `${project_id}.taxi_dataset.location` AS location
ON trips.PULocationID = location.location_id;
-- Show the Looker report:
/*
Clone this report: https://lookerstudio.google.com/reporting/3d85b29a-614c-4f70-b5ef-2c24a9ee0737
Click the 3 dots in the top right and select "Make a copy"
Click "Copy Report"
Click "Resouce" menu then "Manage added data sources"
Click "Edit" under Actions title
Click "${project_id}" (or enter the Project Id) under Project title
Click "${bigquery_taxi_dataset}" under Dataset title
Click "looker_datastream" under Table title
Click "Reconnect"
Click "Apply" - there should be no field changes
Click "Done" - in top right
Click "Close" - in top right
You can now see the data
*/
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.looker_datastream`;