/
slack_pipeline.py
97 lines (70 loc) · 2.21 KB
/
slack_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""Pipeline to load slack into bigquery."""
from typing import List
import dlt
import pendulum
from pendulum import datetime
from slack import slack_source
def load_channels() -> None:
"""Execute a pipeline that will load a list of all the Slack channels in the
workspace to BigQuery"""
pipeline = dlt.pipeline(
pipeline_name="slack", destination="bigquery", dataset_name="slack_dlt"
)
source = slack_source(
page_size=20,
selected_channels=None,
).with_resources("channels")
load_info = pipeline.run(
source,
)
print(load_info)
def get_resources() -> List[str]:
"""Fetch a list of available dlt resources so we can fetch them one at a time"""
resource_dict = slack_source(
page_size=20,
selected_channels=None,
).resources
# Remove the non-channel resources
resource_dict.pop("channels")
resource_dict.pop("access_logs")
resource_dict.pop("users")
return resource_dict.keys()
def load_channel_history(channel: str, start_date: datetime) -> None:
"""Execute a pipeline that will load the given Slack channel
incrementally beginning at the given start date."""
pipeline = dlt.pipeline(
pipeline_name="slack", destination="bigquery", dataset_name="slack_dlt"
)
source = slack_source(
page_size=20,
selected_channels=[channel],
start_date=start_date,
).with_resources(
channel,
)
load_info = pipeline.run(
source,
)
print(load_info)
def get_users() -> None:
"""Execute a pipeline that will load Slack users list."""
pipeline = dlt.pipeline(
pipeline_name="slack", destination="bigquery", dataset_name="slack_dlt"
)
source = slack_source(
page_size=20,
).with_resources("users")
load_info = pipeline.run(
source,
)
print(load_info)
if __name__ == "__main__":
channels = None
start_date = pendulum.now().subtract(days=1).date()
load_channels()
resources = get_resources()
for resource in resources:
if channels is not None and resource not in channels:
continue
load_channel_history(resource, start_date=start_date)
get_users()