# [Cloud Dataflow](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python?hl=ja)

## タイムゾーンの変更

In [1]:
!rm /etc/localtime
!ln -s /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
!date

Wed Mar 30 15:46:53 JST 2022


## Apache Beam SDK を入手する

In [2]:
!pip install apache-beam[gcp]



## Google 認証

In [3]:
from google.colab import auth
auth.authenticate_user()

## 資材準備

In [4]:
!touch requirements.txt

In [5]:
%%writefile setup.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Setup.py module for the workflow's worker utilities.

All the workflow related code is gathered in a package that will be built as a
source distribution, staged in the staging area for the workflow being run and
then installed in the workers when they start running.

This behavior is triggered by specifying the --setup_file command line option
when running the workflow for remote execution.
"""

# pytype: skip-file

from __future__ import absolute_import
from __future__ import print_function

import subprocess
import setuptools
from distutils.command.build import build as _build  # type: ignore

# This class handles the pip install mechanism.
class build(_build):  # pylint: disable=invalid-name
  """A build command class that will be invoked during package install.

  The package built using the current setup.py will be staged and later
  installed in the worker using `pip install package'. This class will be
  instantiated during install for this specific scenario and will trigger
  running the custom commands specified.
  """
  sub_commands = _build.sub_commands + [('CustomCommands', None)]


# Some custom command to run during setup. The command is not essential for this
# workflow. It is used here as an example. Each command will spawn a child
# process. Typically, these commands will include steps to install non-Python
# packages. For instance, to install a C++-based library libjpeg62 the following
# two commands will have to be added:
#
#     ['apt-get', 'update'],
#     ['apt-get', '--assume-yes', 'install', 'libjpeg62'],
#
# First, note that there is no need to use the sudo command because the setup
# script runs with appropriate access.
# Second, if apt-get tool is used then the first command needs to be 'apt-get
# update' so the tool refreshes itself and initializes links to download
# repositories.  Without this initial step the other apt-get install commands
# will fail with package not found errors. Note also --assume-yes option which
# shortcuts the interactive confirmation.
#
# Note that in this example custom commands will run after installing required
# packages. If you have a PyPI package that depends on one of the custom
# commands, move installation of the dependent package to the list of custom
# commands, e.g.:
#
#     ['pip', 'install', 'my_package'],
#
# TODO(BEAM-3237): Output from the custom commands are missing from the logs.
# The output of custom commands (including failures) will be logged in the
# worker-startup log.
CUSTOM_COMMANDS = [['apt-get', 'update'],
                   ['apt-get', '--assume-yes', 'install', 'mecab'],
                   ['apt-get', '--assume-yes', 'install', 'libmecab-dev'],
                   ['apt-get', '--assume-yes', 'install', 'mecab-ipadic-utf8'],
                   ['pip3', 'install', 'google-cloud-language==1.3.0'],
                   ['pip3', 'install', 'mecab-python3'],
                   ['apt-get', '--assume-yes', 'install', 'git', 'make', 'curl', 'xz-utils', 'file'],
                   ['apt-get', '--assume-yes', 'install', 'unzip'],
                   ['wget', 'https://github.com/neologd/mecab-ipadic-neologd/archive/master.zip', '-O', 'mecab-ipadic-neologd.zip'],
                   ['unzip', '-o', 'mecab-ipadic-neologd.zip'],
                   ['./mecab-ipadic-neologd-master/bin/install-mecab-ipadic-neologd', '-n', '-y']]


class CustomCommands(setuptools.Command):
  """A setuptools Command class able to run arbitrary commands."""
  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print('Running command: %s' % command_list)
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print('Command output: %s' % stdout_data)
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)


# Configure the required packages and scripts to install.
# Note that the Python Dataflow containers come with numpy already installed
# so this dependency will not trigger anything to be installed unless a version
# restriction is specified.
REQUIRED_PACKAGES = [
    'mecab-python3', 'unidic-lite'
]

setuptools.setup(
    name='mecab-neologd',
    version='0.0.1',
    author='kanou',
    author_email='kanou@solairo.co.jp',
    url='https://www.solairo.co.jp', 
    description='MeCab NEologd set workflow package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        # Command class instantiated and run during pip install scenarios.
        'build': build,
        'CustomCommands': CustomCommands,
    })

Writing setup.py


### whatya_v2_all

In [6]:
%%writefile dataflow_whatya_v2_all.py
import argparse
import logging
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions

class AnalyzeSentiment(beam.DoFn):
    def process(self, context):
        import time
        from google.cloud import language
        from google.cloud.language import enums
        from google.cloud.language import types
        client = language.LanguageServiceClient()

        text = context["content"]
        language = "ja"
        document = types.Document(
            content=text,
            type=enums.Document.Type.PLAIN_TEXT,
            language=language)
    
        sentiment = client.analyze_sentiment(document).document_sentiment

        num = context["num"]
        score = sentiment.score
        magnitude = sentiment.magnitude

        time.sleep(0.1)

        return [{'date': context["date"],
                 'client': context["client"],
                 'content': text,
                 'num': num,
                 'score': score,
                 'magnitude': magnitude
                }]

def pipeline(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    project = options.view_as(GoogleCloudOptions).project
    dataset = known_args.dataset
    input = known_args.input
    output = known_args.output
    # client = known_args.client
    from_date = known_args.from_date
    to_date = known_args.to_date

    query = """
WITH t AS (
  SELECT
    date,
    client,
    IFNULL(quest_item_name, ctalk_quest_message) AS content,
    count(*) as num
  from {}.{}.{}
  where
    (client in (SELECT DISTINCT(client_id) FROM `bwing-230309.whatya.client_master`) AND date BETWEEN DATE('{}', "Asia/Tokyo") AND DATE('{}', "Asia/Tokyo"))
    AND regexp_extract(quest_item_value, '(000_op)') IS NULL
    AND mtype in ('customer')
  group by date, client, quest_item_name, ctalk_quest_message
  order by num desc
)
SELECT
  date,
  client,
  content,
  num
FROM t
WHERE
    content != ''
    AND regexp_extract(content, '(init|init_bot|init_op|テスト|000_op|string_string_chip)') IS NULL
GROUP BY date, client, content, num
ORDER BY num DESC
""".format(project, dataset, input, from_date, to_date)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    #lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.gcp.bigquery.ReadFromBigQuery(query=query, use_standard_sql=True))

    lines = lines | 'Analyze Sentiment' >> beam.ParDo(AnalyzeSentiment())

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.WriteToBigQuery(project=project, dataset=dataset, table=output, 
                                                            schema='date:DATE, client:STRING, content:STRING, num:INTEGER, score:FLOAT64, magnitude:FLOAT64',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                                                            # write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    # 実行する
    q.run().wait_until_finish()

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the dataflow pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=True, help='Input table')
    parser.add_argument('--output', dest='output', required=True, help='Output table')
    parser.add_argument('--project', dest='project', required=True, help='project')
    parser.add_argument('--region', dest='region', required=True, help='region')
    parser.add_argument('--dataset', dest='dataset', required=True, help='dataset')
    # parser.add_argument('--client', dest='client', required=True, help='client')
    parser.add_argument('--from_date', dest='from_date', required=True, help='From Date(YYYY-MM-DD)')
    parser.add_argument('--to_date', dest='to_date', required=True, help='To Date(YYYY-MM-DD)')
    parser.add_argument('--bucket', dest='bucket', required=True, help='bucket')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = known_args.region
    google_cloud_options.staging_location = '{}/code/'.format(known_args.bucket)
    google_cloud_options.temp_location = '{}/temp/'.format(known_args.bucket)

    worker_options = options.view_as(WorkerOptions)
    worker_options.disk_size_gb = 30
    worker_options.max_num_workers = 1
    worker_options.machine_type = 'n1-standard-2'
    # worker_options.use_public_ips = True

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    setup_option = options.view_as(SetupOptions)
    setup_option.requirements_file = "./requirements.txt"
    setup_option.view_as(SetupOptions).setup_file = './setup.py'

    # BQからBQ
    pipeline(options, known_args)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(sys.argv)


Writing dataflow_whatya_v2_all.py


### whatya_v2_wlp

In [7]:
%%writefile dataflow_whatya_v2_wlp.py
import argparse
import logging
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions

class AnalyzeSentiment(beam.DoFn):
    def process(self, context):
        import time
        from google.cloud import language
        from google.cloud.language import enums
        from google.cloud.language import types

        client = language.LanguageServiceClient()

        text = context["content"]
        language = "ja"
        document = types.Document(
            content=text,
            type=enums.Document.Type.PLAIN_TEXT,
            language=language)
    
        sentiment = client.analyze_sentiment(document).document_sentiment

        num = context["num"]
        score = sentiment.score
        magnitude = sentiment.magnitude

        time.sleep(0.1)

        return [{'date': context["date"],
                 'client': context["client"],
                 'content': text,
                 'num': num,
                 'score': score,
                 'magnitude': magnitude
                }]

def pipeline(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    project = options.view_as(GoogleCloudOptions).project
    dataset = known_args.dataset
    input = known_args.input
    output = known_args.output
    # client = known_args.client
    from_date = known_args.from_date
    to_date = known_args.to_date

    query = """
WITH t AS (
  SELECT
    date,
    client,
    IFNULL(quest_item_name, ctalk_quest_message) AS content,
    count(*) as num
  from {}.{}.{}
  where
    (REGEXP_EXTRACT(client, r'wlp-[0-9]+') IS NOT NULL OR client in ('lla'))
    AND date BETWEEN DATE('{}', "Asia/Tokyo") AND DATE('{}', "Asia/Tokyo")
    AND regexp_extract(quest_item_value, '(000_op)') IS NULL
    AND mtype in ('customer')
  group by date, client, quest_item_name, ctalk_quest_message
  order by num desc
)
SELECT
  date,
  client,
  content,
  num
FROM t
WHERE
    content != ''
    AND regexp_extract(content, '(init|init_bot|init_op|テスト|000_op|string_string_chip)') IS NULL
GROUP BY date, client, content, num
ORDER BY num DESC
""".format(project, dataset, input, from_date, to_date)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    #lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.gcp.bigquery.ReadFromBigQuery(query=query, use_standard_sql=True))

    lines = lines | 'Analyze Sentiment' >> beam.ParDo(AnalyzeSentiment())

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.WriteToBigQuery(project=project, dataset=dataset, table=output, 
                                                            schema='date:DATE, client:STRING, content:STRING, num:INTEGER, score:FLOAT64, magnitude:FLOAT64',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                                                            # write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    # 実行する
    q.run().wait_until_finish()

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the dataflow pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=True, help='Input table')
    parser.add_argument('--output', dest='output', required=True, help='Output table')
    parser.add_argument('--project', dest='project', required=True, help='project')
    parser.add_argument('--region', dest='region', required=True, help='region')
    parser.add_argument('--dataset', dest='dataset', required=True, help='dataset')
    # parser.add_argument('--client', dest='client', required=True, help='client')
    parser.add_argument('--from_date', dest='from_date', required=True, help='From Date(YYYY-MM-DD)')
    parser.add_argument('--to_date', dest='to_date', required=True, help='To Date(YYYY-MM-DD)')
    parser.add_argument('--bucket', dest='bucket', required=True, help='bucket')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = known_args.region
    google_cloud_options.staging_location = '{}/code/'.format(known_args.bucket)
    google_cloud_options.temp_location = '{}/temp/'.format(known_args.bucket)

    worker_options = options.view_as(WorkerOptions)
    worker_options.disk_size_gb = 30
    worker_options.max_num_workers = 1
    worker_options.machine_type = 'n1-standard-2'
    # worker_options.use_public_ips = True

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    setup_option = options.view_as(SetupOptions)
    setup_option.requirements_file = "./requirements.txt"
    setup_option.view_as(SetupOptions).setup_file = './setup.py'

    # BQからBQ
    pipeline(options, known_args)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(sys.argv)


Writing dataflow_whatya_v2_wlp.py


### whatya_v2_zozo

In [8]:
%%writefile dataflow_whatya_v2_zozo.py
import argparse
import logging
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions

class AnalyzeSentiment(beam.DoFn):
    def process(self, context):
        import time
        from google.cloud import language
        from google.cloud.language import enums
        from google.cloud.language import types
        client = language.LanguageServiceClient()

        text = context["content"]
        language = "ja"
        document = types.Document(
            content=text,
            type=enums.Document.Type.PLAIN_TEXT,
            language=language)
    
        sentiment = client.analyze_sentiment(document).document_sentiment

        num = context["num"]
        score = sentiment.score
        magnitude = sentiment.magnitude

        time.sleep(0.1)

        return [{'date': context["date"],
                 'client': context["client"],
                 'content': text,
                 'num': num,
                 'score': score,
                 'magnitude': magnitude
                }]

def pipeline(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    project = options.view_as(GoogleCloudOptions).project
    dataset = known_args.dataset
    input = known_args.input
    output = known_args.output
    # client = known_args.client
    from_date = known_args.from_date
    to_date = known_args.to_date

    query = """
WITH t AS (
  SELECT
    date,
    client,
    IFNULL(quest_item_name, ctalk_quest_message) AS content,
    count(*) as num
  from {}.{}.{}
  where
    (client in ('z00010j', 'z00011d', 'z00012l', 'z00013h', 'z00014a', 'z00016q', 'z00017e') AND date BETWEEN DATE('{}', "Asia/Tokyo") AND DATE('{}', "Asia/Tokyo"))
    AND regexp_extract(quest_item_value, '(000_op)') IS NULL
    AND mtype in ('customer')
  group by date, client, quest_item_name, ctalk_quest_message
  order by num desc
)
SELECT
  date,
  client,
  content,
  num
FROM t
WHERE
    content != ''
    AND regexp_extract(content, '(init|init_bot|init_op|テスト|000_op|string_string_chip)') IS NULL
GROUP BY date, client, content, num
ORDER BY num DESC
""".format(project, dataset, input, from_date, to_date)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    #lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.gcp.bigquery.ReadFromBigQuery(query=query, use_standard_sql=True))

    lines = lines | 'Analyze Sentiment' >> beam.ParDo(AnalyzeSentiment())

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.WriteToBigQuery(project=project, dataset=dataset, table=output, 
                                                            schema='date:DATE, client:STRING, content:STRING, num:INTEGER, score:FLOAT64, magnitude:FLOAT64',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                                                            # write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    # 実行する
    q.run().wait_until_finish()

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the dataflow pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=True, help='Input table')
    parser.add_argument('--output', dest='output', required=True, help='Output table')
    parser.add_argument('--project', dest='project', required=True, help='project')
    parser.add_argument('--region', dest='region', required=True, help='region')
    parser.add_argument('--dataset', dest='dataset', required=True, help='dataset')
    # parser.add_argument('--client', dest='client', required=True, help='client')
    parser.add_argument('--from_date', dest='from_date', required=True, help='From Date(YYYY-MM-DD)')
    parser.add_argument('--to_date', dest='to_date', required=True, help='To Date(YYYY-MM-DD)')
    parser.add_argument('--bucket', dest='bucket', required=True, help='bucket')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = known_args.region
    google_cloud_options.staging_location = '{}/code/'.format(known_args.bucket)
    google_cloud_options.temp_location = '{}/temp/'.format(known_args.bucket)

    worker_options = options.view_as(WorkerOptions)
    worker_options.disk_size_gb = 30
    worker_options.max_num_workers = 1
    worker_options.machine_type = 'n1-standard-2'
    # worker_options.use_public_ips = True

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    setup_option = options.view_as(SetupOptions)
    setup_option.requirements_file = "./requirements.txt"
    setup_option.view_as(SetupOptions).setup_file = './setup.py'

    # BQからBQ
    pipeline(options, known_args)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(sys.argv)


Writing dataflow_whatya_v2_zozo.py


### oksky_chat_all

In [9]:
%%writefile dataflow_oksky_chat_all.py
import argparse
import logging
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions

class AnalyzeSentiment(beam.DoFn):
    def process(self, context):
        import time
        from google.cloud import language
        from google.cloud.language import enums
        from google.cloud.language import types

        client = language.LanguageServiceClient()

        text = context["content"]
        language = "ja"
        document = types.Document(
            content=text,
            type=enums.Document.Type.PLAIN_TEXT,
            language=language)
    
        sentiment = client.analyze_sentiment(document).document_sentiment

        num = context["num"]
        score = sentiment.score
        magnitude = sentiment.magnitude

        time.sleep(0.1)

        return [{'date': context["date"],
                 'client': context["client"],
                 'content': text,
                 'num': num,
                 'score': score,
                 'magnitude': magnitude
                }]

def pipeline(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    project = options.view_as(GoogleCloudOptions).project
    dataset = known_args.dataset
    input = known_args.input
    output = known_args.output
    # client = known_args.client
    from_date = known_args.from_date
    to_date = known_args.to_date

    query = """
SELECT
  date,
  client,
  content,
  num
FROM {}.{}.{}
WHERE
  date BETWEEN DATE('{}', "Asia/Tokyo") AND DATE('{}', "Asia/Tokyo")
ORDER BY num DESC
""".format(project, dataset, input, from_date, to_date)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    #lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.gcp.bigquery.ReadFromBigQuery(query=query, use_standard_sql=True))

    lines = lines | 'Analyze Sentiment' >> beam.ParDo(AnalyzeSentiment())

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.WriteToBigQuery(project=project, dataset=dataset, table=output, 
                                                            schema='date:DATE, client:STRING, content:STRING, num:INTEGER, score:FLOAT64, magnitude:FLOAT64',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                                                            # write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    # 実行する
    q.run().wait_until_finish()

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the dataflow pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=True, help='Input table')
    parser.add_argument('--output', dest='output', required=True, help='Output table')
    parser.add_argument('--project', dest='project', required=True, help='project')
    parser.add_argument('--region', dest='region', required=True, help='region')
    parser.add_argument('--dataset', dest='dataset', required=True, help='dataset')
    # parser.add_argument('--client', dest='client', required=True, help='client')
    parser.add_argument('--from_date', dest='from_date', required=True, help='From Date(YYYY-MM-DD)')
    parser.add_argument('--to_date', dest='to_date', required=True, help='To Date(YYYY-MM-DD)')
    parser.add_argument('--bucket', dest='bucket', required=True, help='bucket')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = known_args.region
    google_cloud_options.staging_location = '{}/code/'.format(known_args.bucket)
    google_cloud_options.temp_location = '{}/temp/'.format(known_args.bucket)

    worker_options = options.view_as(WorkerOptions)
    worker_options.disk_size_gb = 30
    worker_options.max_num_workers = 1
    worker_options.machine_type = 'n1-standard-2'
    # worker_options.use_public_ips = True

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    setup_option = options.view_as(SetupOptions)
    setup_option.requirements_file = "./requirements.txt"
    setup_option.view_as(SetupOptions).setup_file = './setup.py'

    # BQからBQ
    pipeline(options, known_args)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(sys.argv)


Writing dataflow_oksky_chat_all.py


### whatya_v1_kosfja

In [10]:
%%writefile dataflow_whatya_v1_kosfja.py
import argparse
import logging
import sys
from datetime import datetime
from apache_beam.io.gcp import gcsio
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions

class AnalyzeSentiment(beam.DoFn):
    def process(self, context):
        import time
        from google.cloud import language
        from google.cloud.language import enums
        from google.cloud.language import types
        client = language.LanguageServiceClient()

        text = context["content"]
        language = "ja"
        document = types.Document(
            content=text,
            type=enums.Document.Type.PLAIN_TEXT,
            language=language)
    
        sentiment = client.analyze_sentiment(document).document_sentiment

        num = context["num"]
        score = sentiment.score
        magnitude = sentiment.magnitude

        time.sleep(0.1)

        return [{'date': context["date"],
                 'client': context["client"],
                 'content': text,
                 'num': num,
                 'score': score,
                 'magnitude': magnitude
                }]

def pipeline(options=None, known_args=None):
    """
    BQにクエリを投げ、別のテーブルに出力するパイプライン

    :param options:
    :param known_args:
    :return:
    """
    project = options.view_as(GoogleCloudOptions).project
    dataset = known_args.dataset
    input = known_args.input
    output = known_args.output
    client = known_args.client
    from_date = known_args.from_date
    to_date = known_args.to_date

    query = """
SELECT
  date,
  client,
  quest_item_name AS content,
  count(*) as num
from {}.{}.{}
where
  (client = '{}' AND date BETWEEN DATE('{}', "Asia/Tokyo") AND DATE('{}', "Asia/Tokyo"))
  AND quest_item_value = ''
  AND regexp_extract(quest_item_name, '(init|init_bot|テスト|TEST|使い方|肌チェック|完了|肌診断終了|肌終了|肌id AIチャットボットに質問|^[1-9].*|^https?://|^line://|^[a-z|A−Z|あ-ん|0-9|０−９]$)') IS NULL
group by date, client, quest_item_name
order by num desc
""".format(project, dataset, input, client, from_date, to_date)

    ## 新たなパイプラインを生成
    q = beam.Pipeline(options=options)

    # クエリを実行
    #lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
    lines = q | 'ReadFromBigQuery' >> beam.io.Read(beam.io.gcp.bigquery.ReadFromBigQuery(query=query, use_standard_sql=True))

    lines = lines | 'Analyze Sentiment' >> beam.ParDo(AnalyzeSentiment())

    # 別のテーブルに書き込む
    lines | 'WriteToBQ' >> beam.io.WriteToBigQuery(project=project, dataset=dataset, table=output, 
                                                            schema='date:DATE, client:STRING, content:STRING, num:INTEGER, score:FLOAT64, magnitude:FLOAT64',
                                                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
                                                            # write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    # 実行する
    q.run().wait_until_finish()

def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the dataflow pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', required=True, help='Input table')
    parser.add_argument('--output', dest='output', required=True, help='Output table')
    parser.add_argument('--project', dest='project', required=True, help='project')
    parser.add_argument('--region', dest='region', required=True, help='region')
    parser.add_argument('--dataset', dest='dataset', required=True, help='dataset')
    parser.add_argument('--client', dest='client', required=True, help='client')
    parser.add_argument('--from_date', dest='from_date', required=True, help='From Date(YYYY-MM-DD)')
    parser.add_argument('--to_date', dest='to_date', required=True, help='To Date(YYYY-MM-DD)')
    parser.add_argument('--bucket', dest='bucket', required=True, help='bucket')

    known_args, pipeline_args = parser.parse_known_args(argv)

    # PipelineOptionを設定
    options = PipelineOptions(pipeline_args)

    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = known_args.project
    google_cloud_options.region = known_args.region
    google_cloud_options.staging_location = '{}/code/'.format(known_args.bucket)
    google_cloud_options.temp_location = '{}/temp/'.format(known_args.bucket)

    worker_options = options.view_as(WorkerOptions)
    worker_options.disk_size_gb = 30
    worker_options.max_num_workers = 1
    worker_options.machine_type = 'n1-standard-2'
    # worker_options.use_public_ips = True

    options.view_as(StandardOptions).runner = 'DataflowRunner'
    setup_option = options.view_as(SetupOptions)
    setup_option.requirements_file = "./requirements.txt"
    setup_option.view_as(SetupOptions).setup_file = './setup.py'

    # BQからBQ
    pipeline(options, known_args)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run(sys.argv)


Writing dataflow_whatya_v1_kosfja.py


In [11]:
!ls -l

total 56
-rw-r--r-- 1 root root  720 Mar 30 15:47 adc.json
-rw-r--r-- 1 root root 5017 Mar 30 15:47 dataflow_oksky_chat_all.py
-rw-r--r-- 1 root root 5381 Mar 30 15:47 dataflow_whatya_v1_kosfja.py
-rw-r--r-- 1 root root 5577 Mar 30 15:47 dataflow_whatya_v2_all.py
-rw-r--r-- 1 root root 5573 Mar 30 15:47 dataflow_whatya_v2_wlp.py
-rw-r--r-- 1 root root 5585 Mar 30 15:47 dataflow_whatya_v2_zozo.py
-rw-r--r-- 1 root root    0 Mar 30 15:47 requirements.txt
drwxr-xr-x 1 root root 4096 Mar 23 23:22 sample_data
-rw-r--r-- 1 root root 5740 Mar 30 15:47 setup.py


## GCP環境設定

In [12]:
import os
os.environ["GCLOUD_PROJECT"] = "bwing-230309"
#os.environ["GCLOUD_PROJECT"] = "learnlearn-208609"

In [13]:
!gcloud config set project bwing-230309

Updated property [core/project].


In [14]:
#!env

## Dataflow実行

In [15]:
#!python dataflow_whatya_v2_all.py --project=bwing-230309 --region=asia-east1 --dataset=whatya --input=log_info_v2 --output=sentiment_test --from_date=2022-02-01 --to_date=2022-02-02 --bucket=gs://bwing-230309_dataflow

In [16]:
%%bash
### 今月分
#export FROM_DATE=$(date +"%Y-%m-%d" -d"`date +"%Y%m01"`")
#export TO_DATE=$(date +"%Y-%m-%d" -d"`date +"%Y%m01"` 1 days ago + 1 month")

### 先月分
export FROM_DATE=$(date +"%Y-%m-%d" -d"`date +"%Y%m01"` 1 month ago")
export TO_DATE=$(date +"%Y-%m-%d" -d"`date +"%Y%m01"` 1 days ago")

echo ${FROM_DATE}
echo ${TO_DATE}

export PROJECT=bwing-230309
export REGION=asia-east1
#export DATASET=whatya
#export INPUT=log_info
export OUTPUT=sentiment
#export OUTPUT=sentiment_test

# BigQuery のデータのロケーション（asia-northeast1）とGCSバケットの場所が一致すること
export BUCKET=gs://bwing-230309_dataflow

# WhatYa v2
python dataflow_whatya_v2_all.py --project=${PROJECT} --region=${REGION} --dataset=whatya --input=log_info_v2 --output=${OUTPUT} --from_date=${FROM_DATE} --to_date=${TO_DATE} --bucket=${BUCKET}

# WLP
python3 dataflow_whatya_v2_wlp.py --project=${PROJECT} --region=${REGION} --dataset=whatya_smb --input=log_info_v2 --output=${OUTPUT}  --from_date=${FROM_DATE} --to_date=${TO_DATE} --bucket=${BUCKET}

# zozo
python3 dataflow_whatya_v2_zozo.py --project=${PROJECT} --region=${REGION} --dataset=whatya --input=log_info_v2 --output=${OUTPUT}  --from_date=${FROM_DATE} --to_date=${TO_DATE} --bucket=${BUCKET}

# OKSKY
python3 dataflow_oksky_chat_all.py --project=${PROJECT} --region=${REGION} --dataset=oksky_chat --input=quest --output=${OUTPUT}  --from_date=${FROM_DATE} --to_date=${TO_DATE} --bucket=${BUCKET}

# WhatYa v1 (kosfja)
python3 dataflow_whatya_v1_kosfja.py --project=${PROJECT} --region=${REGION} --dataset=whatya --input=log_info --output=${OUTPUT}  --client=kosfja --from_date=${FROM_DATE} --to_date=${TO_DATE} --bucket=${BUCKET}


2022-02-01
2022-02-28


  temp_location = pcoll.pipeline.options.view_as(
  temp_location = p.options.view_as(GoogleCloudOptions).temp_location
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/tmp/tmpnsffgsnm/tmp_requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpsqhcd_wd']

INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/bin/python3', '-m', 'pip', 'download', '--dest', '/tmp/tmpsqhcd_wd', 'apache-beam==2.37.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyP

## Cloud Console

https://console.cloud.google.com/dataflow/jobs?project=bwing-230309&cloudshell=false&hl=ja